-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathutils.py
127 lines (111 loc) · 3.98 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import concurrent.futures as future
import os
from pathlib import Path
from subprocess import PIPE, CalledProcessError, Popen, check_output
from tempfile import TemporaryDirectory
from cv2 import cv2
from pdf2image import convert_from_path
from tesseract import Tesseract, PageSegMode
__all__ = [
"pdf_to_text",
"ocr_to_text",
"get_page_count",
]
TESS = Tesseract()
def pdf_to_text(pdf_path:str , target_dir: str):
"""
Convert pdf at `pdf_path` to a txt file in `target_dir` using XpdfReader's pdftotext.
"""
file_name = Path(pdf_path).stem
command = [
"pdftotext",
"-layout",
pdf_path,
str(Path(target_dir) / f"{file_name}.txt"),
]
proc = Popen(command, stdout=PIPE, stderr=PIPE)
proc.wait()
(stdout, stderr) = proc.communicate()
if proc.returncode:
return stderr
return ""
def _get_tesseract_text(img_path: str, **kwargs):
"""
Use Tesseract API to get the text from the images directly.
Keywords
--------
A dictionary of key, val that Tesseract API can accept.
"""
grayscale = kwargs.get("grayscale", False)
psm = int(kwargs.get("psm", PageSegMode.AUTO))
imcv = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE if grayscale else cv2.IMREAD_COLOR)
for key, val in kwargs.items():
TESS.set_variable(key, str(val))
TESS.set_psm(psm)
height, width, *rest = imcv.shape
depth = (rest or [1])[0]
TESS.set_image(imcv.ctypes, width, height, depth)
gettext = TESS.get_text()
return gettext
def _wrap_get_tesseract_text(img_path: str, kwargs):
"""
A wrapper for `get_tesseract_text` to be used in multiprocessing/concurrency.
"""
return _get_tesseract_text(img_path, **kwargs)
def ocr_to_text(pdf_path: str, batch_size: int=10, first_page: int=1, last_page: int=0, **kwargs):
"""
Convert ocr to text using path2image, cv2 and tesseract api.
`kwargs` belong to the function `get_tesseract_text`.
Args
----
:param pdf_path: ---> str: the path to a pdf document.
:param batch_size: ---> int: size of batches of converted pages
fed into `get_tesseract_text`.
"""
resolution = kwargs.get("user_defined_dpi", 250)
grayscale = kwargs.get("grayscale", False)
if last_page <= 0:
last_page = get_page_count(pdf_path) + last_page
cpus = os.cpu_count()
# To use up all cpus
if cpus > batch_size:
batch_size = cpus
iter_ = 0
for page in range(first_page, last_page + 1, batch_size):
with TemporaryDirectory() as path:
path_to_pages = convert_from_path(
pdf_path,
output_folder=path,
fmt="tiff",
dpi=int(resolution),
first_page=page,
last_page=min(page + batch_size - 1, last_page),
paths_only=True,
grayscale=bool(grayscale),
)
with future.ProcessPoolExecutor(max_workers=cpus) as executor:
tasks = {
executor.submit(_wrap_get_tesseract_text, page, kwargs): i
+ 1
+ iter_ * batch_size
for i, page in enumerate(path_to_pages)
}
for f in future.as_completed(tasks):
page_number = tasks[f]
try:
data = f.result(), page_number
yield data
except Exception as e:
print(f"page #{page_number} generated an exception: {e}")
iter_ += 1
def get_page_count(pdf_path: str):
"""
Use XpdfReader's pdfinfo to extract the number of pages in a pdf file.
"""
try:
output = check_output(["pdfinfo", pdf_path]).decode()
pages_line = [line for line in output.splitlines() if "Pages:" in line][0]
num_pages = int(pages_line.split(":")[1])
return num_pages
except (CalledProcessError, UnicodeDecodeError):
return 0