-
Notifications
You must be signed in to change notification settings - Fork 535
Developed 1 new feature, fixed 2 bugs #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
03d7ba7
8ec66dd
db3b424
9951b41
54c32ac
d4f4a3e
f8f314d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,9 +115,9 @@ def __init__(self, cookie_filename=None, user_agent=None, timeout=None, **kwargs | |
self.browser.set_handle_redirect(True) | ||
self.browser.set_handle_referer(True) | ||
self.browser.set_handle_robots(False) | ||
self.browser.addheaders = [ | ||
('User-agent', user_agent)] | ||
self.browser.addheaders = [('User-agent', user_agent)] | ||
self.proxies = {} | ||
|
||
if timeout is None: | ||
self._default_timout = mechanize._sockettimeout._GLOBAL_DEFAULT_TIMEOUT | ||
else: | ||
|
@@ -169,14 +169,17 @@ def _clear_content(self): | |
del self.content | ||
|
||
def close(self): | ||
""" | ||
clear browse history, avoid memory issue | ||
""" | ||
self._clear_content() | ||
resp = self.browser.response() | ||
if resp is not None: | ||
resp.close() | ||
self.browser.clear_history() | ||
|
||
class SpynnerOpener(Opener): | ||
def __init__(self, user_agent=None, **kwargs): | ||
def __init__(self, user_agent=None, timeout=30, **kwargs): | ||
try: | ||
import spynner | ||
except ImportError: | ||
|
@@ -185,8 +188,9 @@ def __init__(self, user_agent=None, **kwargs): | |
if user_agent is None: | ||
user_agent = 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)' | ||
|
||
self.br = spynner.Browser(user_agent=user_agent, **kwargs) | ||
|
||
self.br = spynner.Browser(user_agent=user_agent) | ||
self._default_timout = timeout | ||
|
||
def spynner_open(self, url, data=None, headers=None, method='GET', | ||
wait_for_text=None, wait_for_selector=None, tries=None): | ||
try: | ||
|
@@ -207,7 +211,7 @@ def wait_callback(br): | |
if method == 'POST': | ||
operation = QNetworkAccessManager.PostOperation | ||
self.br.load(url, wait_callback=wait_callback, tries=tries, | ||
operation=operation, body=data, headers=headers) | ||
operation=operation, body=data, headers=headers,load_timeout= self._default_timout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同样,逗号后加空格。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同意 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这些确定的改动(比如格式),可以先提一个commit修正下。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里空格,再提交改动下哈。 |
||
|
||
return self.br | ||
|
||
|
@@ -222,6 +226,12 @@ def read(self): | |
return self.content if hasattr(self, 'content') else self.br.contents | ||
|
||
def wait_for_selector(self, selector, **kwargs): | ||
self.br.wait_for_content( | ||
lambda br: not br.webframe.findFirstElement(selector).isNull(), | ||
**kwargs) | ||
self.br.wait_for_content(lambda br: not br.webframe.findFirstElement(selector).isNull(), | ||
**kwargs) | ||
|
||
def add_proxy(self,addr, proxy_type='all', | ||
user=None, password=None): | ||
self.br.set_proxy(addr) | ||
|
||
def set_default_timeout(self, timeout): | ||
self._default_timout = timeout |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,12 +107,12 @@ def run_containers(n_containers, n_instances, working_dir, job_def_path, | |
is_local=is_local, master_ip=master_ip, task_start_id=acc) | ||
if is_multi_process: | ||
process = multiprocessing.Process(target=container.run, | ||
args=(True, )) | ||
args=(True,)) | ||
process.start() | ||
processes.append(process) | ||
else: | ||
thread = threading.Thread(target=container.run, | ||
args=(True, )) | ||
args=(True,)) | ||
thread.start() | ||
processes.append(thread) | ||
acc += n_tasks | ||
|
@@ -138,7 +138,7 @@ def __init__(self, ctx, job_def_path, job_name, | |
self.job_name = job_name | ||
self.working_dir = working_dir or os.path.join(self.ctx.working_dir, | ||
self.job_name) | ||
self.logger = get_logger(name='cola_job'+str(time.time())) | ||
self.logger = get_logger(name='cola_job' + str(time.time())) | ||
self.job_desc = job_desc or import_job_desc(job_def_path) | ||
|
||
self.settings = self.job_desc.settings | ||
|
@@ -168,14 +168,16 @@ def _register_rpc(self): | |
self.rpc_server.register_function(self.shutdown, name='shutdown', | ||
prefix=self.prefix) | ||
if self.ctx.is_local_mode: | ||
self.rpc_server.register_function(lambda: [self.job_name, ], | ||
self.rpc_server.register_function(lambda: [self.job_name,], | ||
name='get_jobs') | ||
|
||
def init_deduper(self): | ||
deduper_cls = import_module(self.settings.job.components.deduper.cls) | ||
|
||
base = 1 if not self.is_bundle else 1000 | ||
size = self.job_desc.settings.job.size | ||
if isinstance(size,basestring): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 空格。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 同意 |
||
size = len(self.job_desc.settings.job.starts) * 10 | ||
capacity = UNLIMIT_BLOOM_FILTER_CAPACITY | ||
if size > 0: | ||
capacity = max(base * size * 10, capacity) | ||
|
@@ -203,7 +205,7 @@ def init_mq(self): | |
|
||
def _init_function_servers(self): | ||
budget_dir = os.path.join(self.working_dir, 'budget') | ||
budget_cls = BudgetApplyServer if not self.is_multi_process \ | ||
budget_cls = BudgetApplyServer if not self.is_multi_process \ | ||
else self.manager.budget_server | ||
self.budget_server = budget_cls(budget_dir, self.settings, | ||
None, self.job_name) | ||
|
@@ -264,8 +266,7 @@ def init(self): | |
def run(self, block=False): | ||
self.init() | ||
try: | ||
self.processes = run_containers( | ||
self.n_containers, self.n_instances, self.working_dir, | ||
self.processes = run_containers(self.n_containers, self.n_instances, self.working_dir, | ||
self.job_def_path, self.job_name, self.ctx.env, self.mq, | ||
self.counter_arg, self.budget_arg, self.speed_arg, | ||
self.stopped, self.nonsuspend, self.idle_statuses, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,8 +71,7 @@ def __init__(self, id_, job_desc, mq, | |
is_local=False, env=None, logger=None): | ||
self.id_ = id_ | ||
self.job_desc = job_desc | ||
self.opener = job_desc.opener_cls( | ||
timeout=DEFAULT_OPENER_TIMEOUT) | ||
self.opener = job_desc.opener_cls(timeout=DEFAULT_OPENER_TIMEOUT) | ||
self.mq = mq | ||
self.dir_ = working_dir | ||
self.settings = job_desc.settings | ||
|
@@ -116,8 +115,7 @@ def _configure_proxy(self): | |
for p in proxies: | ||
proxy_type = p.type if p.has('type') else 'all' | ||
if p.has('addr'): | ||
self.opener.add_proxy( | ||
p.addr, | ||
self.opener.add_proxy(p.addr, | ||
proxy_type=proxy_type, | ||
user=p.user if p.has('user') else None, | ||
password=p.password if p.has('password') else None) | ||
|
@@ -210,7 +208,7 @@ def _pack_error(self, url, msg, error, content=None, | |
|
||
msg_filename = os.path.join(path, ERROR_MSG_FILENAME) | ||
with open(msg_filename, 'w') as f: | ||
f.write(msg+'\n') | ||
f.write(msg + '\n') | ||
traceback.print_exc(file=f) | ||
|
||
content_filename = os.path.join(path, | ||
|
@@ -304,6 +302,8 @@ class UrlExecutor(Executor): | |
def __init__(self, *args, **kwargs): | ||
super(UrlExecutor, self).__init__(*args, **kwargs) | ||
self.budges = 0 | ||
self.url_error_times = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里的改动是因为,url是个str类型,而没有error_times属性? 原则上这么改应该没有问题,但为了和bundle做统一,应当在task里送到executor时,判断url类型,如果是str,应当转成Url对象。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. url_error_times 在实例化后用到,但没有初始化
··· There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 不是,我的意思是,url的error_times,应当是记在Url对象上的,这样再放回mq的时候,也能保存信息,这里放在executor上,等executor退出了,就丢了。 而url有时候在用户parser返回的是个str,我们应当让executor处理的时候转成cola.core.unit.Url对象才是。而error_times的信息记录在Url对象上。bundle也是这么做的。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 有道理,这里下一次pr先修改 |
||
|
||
|
||
def _parse(self, parser_cls, options, url): | ||
if hasattr(self, 'content'): | ||
|
@@ -314,14 +314,17 @@ def _parse(self, parser_cls, options, url): | |
counter=ExecutorCounter(self), | ||
settings=ReadOnlySettings(self.settings), | ||
**options).parse() | ||
return list(res) | ||
if res: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 避免res返回空值后list函数错误 |
||
return list(res) | ||
else: | ||
return list() | ||
|
||
def _log_error(self, url, e): | ||
if self.logger: | ||
self.logger.error('Error when handle url: %s' % (str(url))) | ||
self.logger.exception(e) | ||
|
||
url.error_times = getattr(url, 'error_times', 0) + 1 | ||
self.url_error_times[url] = self.url_error_times.get(url, 0) + 1 | ||
|
||
self.counter_client.local_inc(self.ip, self.id_, | ||
'error_urls', 1) | ||
|
@@ -333,7 +336,7 @@ def _handle_error(self, url, e, pack=True): | |
|
||
self._log_error(url, e) | ||
retries, span, ignore = self._get_handle_error_params(e) | ||
if url.error_times <= retries: | ||
if self.url_error_times.get(url, 0) <= retries: | ||
self.stopped.wait(span) | ||
return | ||
|
||
|
@@ -350,6 +353,7 @@ def _handle_error(self, url, e, pack=True): | |
self._error() | ||
raise UnitRetryFailed | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 添加行多余了。下同。 |
||
def _clear_error(self, url): | ||
if hasattr(url, 'error_times'): | ||
del url.error_times | ||
|
@@ -364,7 +368,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): | |
kw = {'pages': 1, 'secs': t} | ||
self.counter_client.multi_local_inc(self.ip, self.id_, **kw) | ||
self.counter_client.multi_global_inc(**kw) | ||
|
||
self._clear_error(url) | ||
self._recover_normal() | ||
|
||
|
@@ -383,7 +387,7 @@ def _parse_with_process_exception(self, parser_cls, options, url): | |
except Exception, e: | ||
self._handle_error(url, e) | ||
|
||
return [url, ] | ||
return [url,] | ||
|
||
def execute(self, url, is_inc=False): | ||
failed = False | ||
|
@@ -402,20 +406,23 @@ def execute(self, url, is_inc=False): | |
parser_cls, options = self.job_desc.url_patterns.get_parser(url, options=True) | ||
if parser_cls is not None: | ||
if rates == 0: | ||
rates, span = self.speed_client.require( | ||
DEFAULT_SPEEED_REQUIRE_SIZE) | ||
rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) | ||
if rates == 0: | ||
if self.stopped.wait(5): | ||
return | ||
rates -= 1 | ||
|
||
try: | ||
next_urls = self._parse_with_process_exception( | ||
parser_cls, options, url) | ||
next_urls = self._parse_with_process_exception(parser_cls, options, url) | ||
next_urls = list(self.job_desc.url_patterns.matches(next_urls)) | ||
|
||
if next_urls: | ||
self.mq.put(next_urls) | ||
# inc budget if auto budget enabled | ||
if self.settings.job.size == 'auto': | ||
inc_budgets = len(next_urls) | ||
if inc_budgets > 0: | ||
self.budget_client.inc_budgets(inc_budgets) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里,如果next_urls当中存在已经抓取的,这个len(next_urls)就不等于待抓取的urls了吧。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里确实没有考虑到,似乎需要判断mq里put方法存放的url数量 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这个地方会比较棘手一点,因为MessageQueueNodeProxy在put的时候,并没有真正做放进去的操作,而是对每个其他worker,做了一个cache,当cache满了的时候,才会flush出去。这样做的目的是为了减少网络间的传输开销。 所以,现在put方法并不知道真正放进去了多少个,而去重的操作是在mq的每个节点上才会去做的。 这里budgets的数量大于真实抓取的数量,会导致不能立刻结束么?如果不导致,这里应该就不用改了我理解。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,如果budgets的数量大于真实抓取的数量,会导致JOB一直在等待状态。 好像也没有什么好办法,如果遇到有重复的URL,只能按run_loca_job的思路做。如果IDLE超时,自动结束,对吗?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,我理解现在只能这样。 |
||
if hasattr(self.opener, 'close'): | ||
self.opener.close() | ||
|
||
|
@@ -458,8 +465,7 @@ def _parse(self, parser_cls, options, bundle, url): | |
|
||
def _log_error(self, bundle, url, e): | ||
if self.logger: | ||
self.logger.error('Error when handle bundle: %s, url: %s' % ( | ||
str(bundle), str(url))) | ||
self.logger.error('Error when handle bundle: %s, url: %s' % (str(bundle), str(url))) | ||
self.logger.exception(e) | ||
if url == getattr(bundle, 'error_url', None): | ||
bundle.error_times = getattr(bundle, 'error_times', 0) + 1 | ||
|
@@ -499,6 +505,9 @@ def _handle_error(self, bundle, url, e, pack=True): | |
|
||
if ignore: | ||
bundle.error_urls.append(url) | ||
# dec budget if auto budget enabled | ||
if self.settings.job.size == 'auto': | ||
self.budget_client.dec_budgets(1) | ||
return | ||
else: | ||
bundle.current_urls.insert(0, url) | ||
|
@@ -525,6 +534,7 @@ def _parse_with_process_exception(self, parser_cls, options, | |
self.counter_client.multi_local_inc(self.ip, self.id_, **kw) | ||
self.counter_client.multi_global_inc(**kw) | ||
|
||
|
||
self._clear_error(bundle) | ||
self._recover_normal() | ||
|
||
|
@@ -543,7 +553,7 @@ def _parse_with_process_exception(self, parser_cls, options, | |
except Exception, e: | ||
self._handle_error(bundle, url, e) | ||
|
||
return [url, ], [] | ||
return [url,], [] | ||
|
||
def execute(self, bundle, max_sec, is_inc=False): | ||
failed = False | ||
|
@@ -565,25 +575,22 @@ def execute(self, bundle, max_sec, is_inc=False): | |
|
||
url = bundle.current_urls.pop(0) | ||
if self.logger: | ||
self.logger.debug('get %s url: %s' % | ||
(bundle.label, url)) | ||
self.logger.debug('get %s url: %s' % (bundle.label, url)) | ||
|
||
rates = 0 | ||
span = 0.0 | ||
parser_cls, options = self.job_desc.url_patterns.get_parser(url, | ||
options=True) | ||
if parser_cls is not None: | ||
if rates == 0: | ||
rates, span = self.speed_client.require( | ||
DEFAULT_SPEEED_REQUIRE_SIZE) | ||
rates, span = self.speed_client.require(DEFAULT_SPEEED_REQUIRE_SIZE) | ||
if rates == 0: | ||
if self.stopped.wait(5): | ||
break | ||
rates -= 1 | ||
|
||
try: | ||
next_urls, bundles = self._parse_with_process_exception( | ||
parser_cls, options, bundle, url) | ||
next_urls, bundles = self._parse_with_process_exception(parser_cls, options, bundle, url) | ||
next_urls = list(self.job_desc.url_patterns.matches(next_urls)) | ||
next_urls.extend(bundle.current_urls) | ||
if self.shuffle_urls: | ||
|
@@ -597,6 +604,12 @@ def execute(self, bundle, max_sec, is_inc=False): | |
|
||
if bundles: | ||
self.mq.put(bundles) | ||
# inc budget if auto budget enabled | ||
if self.settings.job.size == 'auto': | ||
inc_budgets = len(bundles) | ||
if inc_budgets > 0: | ||
self.budget_client.inc_budgets(inc_budgets) | ||
|
||
if hasattr(self.opener, 'close'): | ||
self.opener.close() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯,这是个明显的错误。逗号后要加空格。