Skip to content

Add npm #1037

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 18, 2023
Merged

Add npm #1037

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ share-component-source/packages/*
wui-1.7.0-console.zip
wui1.7.0-debug2.zip
env/
.vs/*
.vs/*
wingetui/components/package-lock.json
wingetui/components/package.json
131 changes: 71 additions & 60 deletions wingetui/PackageManagers/npm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@
from .sampleHelper import *


class ScoopPackageManager(DynamicLoadPackageManager):
class NPMPackageManager(DynamicLoadPackageManager):

ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')

EXECUTABLE = "npm"

NAME = "NPM"
NAME = "Npm"
CACHE_FILE = os.path.join(os.path.expanduser("~"), f".wingetui/cacheddata/{NAME}CachedPackages")
CACHE_FILE_PATH = os.path.join(os.path.expanduser("~"), ".wingetui/cacheddata")

BLACKLISTED_PACKAGE_NAMES = ["WARNING:", "[notice]", "Package"]
BLACKLISTED_PACKAGE_IDS = ["WARNING:", "[notice]", "Package"]
BLACKLISTED_PACKAGE_VERSIONS = ["Ignoring", "invalie"]
BLACKLISTED_PACKAGE_NAMES = []
BLACKLISTED_PACKAGE_IDS = []
BLACKLISTED_PACKAGE_VERSIONS = []

icon = None

Expand All @@ -35,7 +35,7 @@ def getPackagesForQuery(self, query: str) -> list[Package]:
print(f"🔵 Starting {self.NAME} search for dynamic packages")
try:
packages: list[Package] = []
p = subprocess.Popen(f"{self.EXECUTABLE} search {query}", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(f"{self.EXECUTABLE} search {query}", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.path.expanduser("~"), env=os.environ.copy(), shell=True)
DashesPassed = False
while p.poll() is None:
line: str = str(p.stdout.readline().strip(), "utf-8", errors="ignore")
Expand All @@ -46,9 +46,9 @@ def getPackagesForQuery(self, query: str) -> list[Package]:
else:
package = list(filter(None, line.split("|")))
if len(package) >= 4:
name = formatPackageIdAsName(package[0][1:] if package[0][0] == "@" else package[0])
id = package[0]
version = package[4]
name = formatPackageIdAsName(package[0][1:] if package[0][0] == "@" else package[0]).strip()
id = package[0].strip()
version = package[4].strip()
source = self.NAME
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS:
packages.append(Package(name, id, version, source, Npm))
Expand All @@ -65,7 +65,7 @@ def getAvailableUpdates(self) -> list[UpgradablePackage]:
print(f"🔵 Starting {self.NAME} search for updates")
try:
packages: list[UpgradablePackage] = []
p = subprocess.Popen(f"{self.EXECUTABLE} outdated", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(f"{self.EXECUTABLE} outdated", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.path.expanduser("~"), env=os.environ.copy(), shell=True)
DashesPassed = False
while p.poll() is None:
line: str = str(p.stdout.readline().strip(), "utf-8", errors="ignore")
Expand All @@ -76,10 +76,10 @@ def getAvailableUpdates(self) -> list[UpgradablePackage]:
else:
package = list(filter(None, line.split(" ")))
if len(package) >= 4:
name = formatPackageIdAsName(package[0][1:] if package[0][0] == "@" else package[0])
id = package[0]
version = package[1]
newVersion = package[3]
name = formatPackageIdAsName(package[0][1:] if package[0][0] == "@" else package[0]).strip()
id = package[0].strip()
version = package[1].strip()
newVersion = package[3].strip()
source = self.NAME
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS and not newVersion in self.BLACKLISTED_PACKAGE_VERSIONS:
packages.append(UpgradablePackage(name, id, version, newVersion, source, Npm))
Expand All @@ -96,7 +96,8 @@ def getInstalledPackages(self) -> list[Package]:
print(f"🔵 Starting {self.NAME} search for installed packages")
try:
packages: list[Package] = []
p = subprocess.Popen(f"{self.EXECUTABLE} list", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
p = subprocess.Popen(f"{self.EXECUTABLE} list", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.path.expanduser("~"), env=os.environ.copy(), shell=True)
currentScope = ""
while p.poll() is None:
line: str = str(p.stdout.readline().strip(), "utf-8", errors="ignore")
if line and len(line) > 4:
Expand All @@ -105,14 +106,15 @@ def getInstalledPackages(self) -> list[Package]:
package = line.split("@")
if len(package) >= 2:
idString = '@'.join(package[:-1]).strip()
name = formatPackageIdAsName(idString[1:] if idString[0] == "@" else idString)
id = idString
version = package[-1]
name = formatPackageIdAsName(idString[1:] if idString[0] == "@" else idString).strip()
id = idString.strip()
version = package[-1].strip()
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS:
packages.append(Package(name, id, version, self.NAME, Npm))
else:
print(line)
p = subprocess.Popen(f"{self.EXECUTABLE} list -g", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ.copy(), shell=True)
packages.append(Package(name, id, version, self.NAME+currentScope, Npm))
elif "@" in line.split(" ")[0]:
currentScope = "@"+line.split(" ")[0][:-1]
print("🔵 NPM changed scope to", currentScope)
p = subprocess.Popen(f"{self.EXECUTABLE} list -g", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.path.expanduser("~"), env=os.environ.copy(), shell=True)
while p.poll() is None:
line: str = str(p.stdout.readline().strip(), "utf-8", errors="ignore")
if line and len(line) > 4:
Expand All @@ -125,9 +127,7 @@ def getInstalledPackages(self) -> list[Package]:
id = idString
version = package[-1].strip()
if not name in self.BLACKLISTED_PACKAGE_NAMES and not id in self.BLACKLISTED_PACKAGE_IDS and not version in self.BLACKLISTED_PACKAGE_VERSIONS:
packages.append(Package(name, id, version, self.NAME, Npm))
else:
print(line)
packages.append(Package(name, id, version, self.NAME+"@global", Npm))
print(f"🟢 {self.NAME} search for installed packages finished with {len(packages)} result(s)")
return packages
except Exception as e:
Expand All @@ -141,37 +141,42 @@ def getPackageDetails(self, package: Package) -> PackageDetails:
print(f"🔵 Starting get info for {package.Name} on {self.NAME}")
details = PackageDetails(package)
try:
details.ManifestUrl = f"https://pypi.org/project/{package.Id}/"
details.ReleaseNotesUrl = f"https://pypi.org/project/{package.Id}/#history"
details.InstallerURL = f"https://pypi.org/project/{package.Id}/#files"
details.Scopes = [_("User")]
details.InstallerType = "NPM"

p = subprocess.Popen(f"{self.EXECUTABLE} show {package.Id} -v", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.getcwd(), env=os.environ, shell=True)
details.InstallerType = "Tarball"
details.ManifestUrl = f"https://www.npmjs.com/package/{package.Id}"
details.ReleaseNotesUrl = f"https://www.npmjs.com/package/{package.Id}?activeTab=versions"
details.Scopes = ["Global"]
p = subprocess.Popen(f"{self.EXECUTABLE} info {package.Id}", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, cwd=os.path.expanduser("~"), env=os.environ, shell=True)
output: list[str] = []
while p.poll() is None:
line = p.stdout.readline()
if line:
output.append(str(line, encoding='utf-8', errors="ignore"))
output.append(str(line, encoding='utf-8', errors="ignore").strip())
lineNo = 0
ReadingMaintainer = False
for line in output:
if "Name:" in line:
details.Name = formatPackageIdAsName(line.replace("Name:", "").strip()) if "-" in line else line.replace("Name:", "").strip()
elif "Author:" in line:
details.Author = line.replace("Author:", "").strip()
elif "Home-page:" in line:
details.HomepageURL = line.replace("Home-page:", "").strip()
elif "License:" in line:
details.License = line.replace("License:", "").strip()
elif "License ::" in line:
details.License = line.split("::")[-1].strip()
elif "Summary:" in line:
details.Description = line.replace("Summary:", "").strip()
elif "Release Notes" in line:
details.ReleaseNotesUrl = line.replace("Release Notes:", "").strip()
elif "Topic ::" in line:
if line.split("::")[-1].strip() not in details.Tags:
details.Tags.append(line.split("::")[-1].strip())

lineNo += 1
if lineNo == 2:
details.License = line.split("|")[1]
elif lineNo == 3:
details.Description = line.strip()
elif line == 4:
details.HomepageURL = line.strip()
elif line.startswith(".tarball"):
details.InstallerURL = line.replace(".tarball: ", "").strip()
try:
details.InstallerSize = int(urlopen(details.InstallerURL).length/1000000)
except Exception as e:
print("🟠 Can't get installer size:", type(e), str(e))
elif line.startswith(".integrity"):
details.InstallerHash = "<br>"+line.replace(".integrity: sha512-", "").replace("==", "").strip()
elif line.startswith("maintainers:"):
ReadingMaintainer = True
elif ReadingMaintainer:
ReadingMaintainer = False
details.Author = line.replace("-", "").split("<")[0].strip()
elif line.startswith("published"):
details.Publisher = line.split("by")[-1].split("<")[0].strip()
details.UpdateDate = line.split("by")[0].replace("published", "").strip()
print(f"🟢 Get info finished for {package.Name} on {self.NAME}")
return details
except Exception as e:
Expand All @@ -183,7 +188,7 @@ def getIcon(self, source: str) -> QIcon:
self.icon = QIcon(getMedia("node"))
return self.icon

def getParameters(self, options: InstallationOptions, removeprogressbar: bool = True) -> list[str]:
def getParameters(self, options: InstallationOptions) -> list[str]:
Parameters: list[str] = []
if options.CustomParameters:
Parameters += options.CustomParameters
Expand All @@ -194,19 +199,23 @@ def getParameters(self, options: InstallationOptions, removeprogressbar: bool =
return Parameters

def startInstallation(self, package: Package, options: InstallationOptions, widget: InstallationWidgetType) -> subprocess.Popen:
Command = [self.EXECUTABLE, "install", package.Id] + self.getParameters(options)
if "@global" in package.Source:
options.InstallationScope = "Global"
Command = ["cmd.exe", "/C", self.EXECUTABLE, "install", package.Id+"@latest"] + self.getParameters(options)
if options.RunAsAdministrator:
Command = [GSUDO_EXECUTABLE] + Command
print(f"🔵 Starting {package} installation with Command", Command)
p = subprocess.Popen(Command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=GSUDO_EXE_LOCATION, env=os.environ)
p = subprocess.Popen(Command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=os.path.expanduser("~"), env=os.environ)
Thread(target=self.installationThread, args=(p, options, widget,), name=f"{self.NAME} installation thread: installing {package.Name}").start()

def startUpdate(self, package: Package, options: InstallationOptions, widget: InstallationWidgetType) -> subprocess.Popen:
Command = [self.EXECUTABLE, "update", package.Id] + self.getParameters(options)
def startUpdate(self, package: UpgradablePackage, options: InstallationOptions, widget: InstallationWidgetType) -> subprocess.Popen:
if "@global" in package.Source:
options.InstallationScope = "Global"
Command = ["cmd.exe", "/C", self.EXECUTABLE, "install", package.Id+"@"+package.NewVersion] + self.getParameters(options)
if options.RunAsAdministrator:
Command = [GSUDO_EXECUTABLE] + Command
print(f"🔵 Starting {package} update with Command", Command)
p = subprocess.Popen(Command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=GSUDO_EXE_LOCATION, env=os.environ)
p = subprocess.Popen(Command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=os.path.expanduser("~"), env=os.environ)
Thread(target=self.installationThread, args=(p, options, widget,), name=f"{self.NAME} installation thread: update {package.Name}").start()

def installationThread(self, p: subprocess.Popen, options: InstallationOptions, widget: InstallationWidgetType):
Expand All @@ -227,11 +236,13 @@ def installationThread(self, p: subprocess.Popen, options: InstallationOptions,
widget.finishInstallation.emit(outputCode, output)

def startUninstallation(self, package: Package, options: InstallationOptions, widget: InstallationWidgetType) -> subprocess.Popen:
Command = [self.EXECUTABLE, "uninstall", package.Id, "-y"] + self.getParameters(options, removeprogressbar=False)
if "@global" in package.Source:
options.InstallationScope = "Global"
Command = [self.EXECUTABLE, "uninstall", ] + self.getParameters(options)
if options.RunAsAdministrator:
Command = [GSUDO_EXECUTABLE] + Command
print(f"🔵 Starting {package} uninstall with Command", Command)
p = subprocess.Popen(Command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=GSUDO_EXE_LOCATION, env=os.environ)
p = subprocess.Popen(" ".join(Command), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, shell=True, cwd=os.path.expanduser("~"), env=os.environ)
Thread(target=self.uninstallationThread, args=(p, options, widget,), name=f"{self.NAME} installation thread: uninstall {package.Name}").start()

def uninstallationThread(self, p: subprocess.Popen, options: InstallationOptions, widget: InstallationWidgetType):
Expand Down Expand Up @@ -263,4 +274,4 @@ def updateSources(self, signal: Signal = None) -> None:
if signal:
signal.emit()

Npm = ScoopPackageManager()
Npm = NPMPackageManager()
4 changes: 2 additions & 2 deletions wingetui/PackageManagers/pip.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .sampleHelper import *


class ScoopPackageManager(DynamicLoadPackageManager):
class PipPackageManager(DynamicLoadPackageManager):

ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]')

Expand Down Expand Up @@ -254,4 +254,4 @@ def updateSources(self, signal: Signal = None) -> None:
if signal:
signal.emit()

Pip = ScoopPackageManager()
Pip = PipPackageManager()
3 changes: 1 addition & 2 deletions wingetui/PackageManagers/sampleHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def updateSources(self, signal: Signal = None) -> None:
subprocess.run(f"{self.EXECUTABLE} update self", shell=True, stdout=subprocess.PIPE)
if signal:
signal.emit()

class DynamicLoadPackageManager(SamplePackageManager):

def getAvailablePackages(self, second_attempt: bool = False) -> list[Package]:
Expand All @@ -281,6 +281,5 @@ def getPackagesForQuery(self, query: str) -> list[Package]:
"""
raise NotImplementedError("This method must be reimplemented")


if(__name__=="__main__"):
import __init__
Loading