-
Notifications
You must be signed in to change notification settings - Fork 217
[WIP] Added Cassandra-Backend #128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 6 commits
4f9c92a
125622e
7bb2605
6238355
039fd0e
667e65c
559400e
4ffe038
2e95323
c53c3f7
e190256
b1bf7bc
2019a71
ed6b561
fef1d83
5a0967f
29a0bdd
2919e79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
from __future__ import absolute_import | ||
from cassandra.cluster import Cluster | ||
from cassandra.cqlengine import connection | ||
from cassandra.query import dict_factory | ||
from cassandra.cqlengine.management import sync_table | ||
from cassandra.cqlengine.management import drop_table | ||
from frontera.core.components import DistributedBackend | ||
from frontera.contrib.backends import CommonBackend | ||
from frontera.contrib.backends.cassandra.components import Metadata, Queue, States | ||
from frontera.utils.misc import load_object | ||
|
||
|
||
class CassandraBackend(CommonBackend): | ||
def __init__(self, manager): | ||
self.manager = manager | ||
settings = manager.settings | ||
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] | ||
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') | ||
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') | ||
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true | ||
models = settings.get('CASSANDRABACKEND_MODELS') | ||
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID') | ||
|
||
self.cluster = Cluster(cluster_ips, cluster_port) | ||
self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) | ||
|
||
self.session_cls = self.cluster.connect() | ||
self.session_cls.row_factory = dict_factory | ||
self.session_cls.encoder.mapping[dict] = self.session_cls.encoder.cql_encode_map_collection | ||
self.crawl_id = crawl_id | ||
|
||
if keyspace_create: | ||
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" | ||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) | ||
self.session_cls.execute(query) | ||
|
||
self.session_cls.set_keyspace(keyspace) | ||
|
||
connection.set_session(self.session_cls) | ||
|
||
if drop_all_tables: | ||
for key, value in self.models.iteritems(): | ||
drop_table(value) | ||
|
||
for key, value in self.models.iteritems(): | ||
sync_table(value) | ||
|
||
self._metadata = Metadata(self.session_cls, self.models['MetadataModel'], | ||
settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id=self.crawl_id) | ||
self._states = States(self.session_cls, self.models['StateModel'], | ||
settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id=self.crawl_id) | ||
self._queue = self._create_queue(settings) | ||
|
||
def frontier_stop(self): | ||
self.states.flush() | ||
self.session_cls.shutdown() | ||
|
||
def _create_queue(self, settings): | ||
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), | ||
crawl_id=self.crawl_id) | ||
|
||
@property | ||
def queue(self): | ||
return self._queue | ||
|
||
@property | ||
def metadata(self): | ||
return self._metadata | ||
|
||
@property | ||
def states(self): | ||
return self._states | ||
|
||
|
||
class FIFOBackend(CassandraBackend): | ||
component_name = 'Cassandra FIFO Backend' | ||
|
||
def _create_queue(self, settings): | ||
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), | ||
ordering='created') | ||
|
||
|
||
class LIFOBackend(CassandraBackend): | ||
component_name = 'Cassandra LIFO Backend' | ||
|
||
def _create_queue(self, settings): | ||
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'), | ||
ordering='created_desc') | ||
|
||
BASE = CommonBackend | ||
LIFO = LIFOBackend | ||
FIFO = FIFOBackend | ||
|
||
|
||
class Distributed(DistributedBackend): | ||
def __init__(self, manager): | ||
self.manager = manager | ||
settings = manager.settings | ||
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this comment isn't needed here anymore |
||
cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT') | ||
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE') | ||
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true | ||
models = settings.get('CASSANDRABACKEND_MODELS') | ||
|
||
self.cluster = Cluster(cluster_ips, cluster_port) | ||
self.models = dict([(name, load_object(klass)) for name, klass in models.items()]) | ||
|
||
self.session_cls = self.cluster.connect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like a session, not a session class. May be worth renaming? |
||
self.session_cls.row_factory = dict_factory | ||
|
||
if keyspace_create: | ||
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\" | ||
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, ) | ||
self.session_cls.execute(query) | ||
self.session_cls.set_keyspace(keyspace) | ||
connection.set_session(self.session_cls) | ||
|
||
self._metadata = None | ||
self._queue = None | ||
self._states = None | ||
|
||
@classmethod | ||
def strategy_worker(cls, manager): | ||
b = cls(manager) | ||
settings = manager.settings | ||
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
model = b.models['StateModel'] | ||
|
||
if drop_all_tables: | ||
drop_table(model) | ||
|
||
sync_table(model) | ||
|
||
b._states = States(b.session_cls, model, | ||
settings.get('STATE_CACHE_SIZE_LIMIT')) | ||
return b | ||
|
||
@classmethod | ||
def db_worker(cls, manager): | ||
b = cls(manager) | ||
settings = manager.settings | ||
drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES') | ||
|
||
metadata_m = b.models['MetadataModel'] | ||
queue_m = b.models['QueueModel'] | ||
stats_m = b.models['CrawlStatsModel'] | ||
if drop: | ||
drop_table(metadata_m) | ||
drop_table(queue_m) | ||
drop_table(stats_m) | ||
|
||
sync_table(metadata_m) | ||
sync_table(queue_m) | ||
sync_table(stats_m) | ||
|
||
b._metadata = Metadata(b.session_cls, metadata_m, | ||
settings.get('CASSANDRABACKEND_CACHE_SIZE')) | ||
b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS')) | ||
return b | ||
|
||
@property | ||
def queue(self): | ||
return self._queue | ||
|
||
@property | ||
def metadata(self): | ||
return self._metadata | ||
|
||
@property | ||
def states(self): | ||
return self._states | ||
|
||
def frontier_start(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you sure all these methods need to be copy/pasted here? IMO, it would be great to extract them to parent class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I give it a try :) |
||
for component in [self.metadata, self.queue, self.states]: | ||
if component: | ||
component.frontier_start() | ||
|
||
def frontier_stop(self): | ||
for component in [self.metadata, self.queue, self.states]: | ||
if component: | ||
component.frontier_stop() | ||
|
||
def add_seeds(self, seeds): | ||
self.metadata.add_seeds(seeds) | ||
|
||
def get_next_requests(self, max_next_requests, **kwargs): | ||
partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions | ||
batch = [] | ||
for partition_id in partitions: | ||
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs)) | ||
return batch | ||
|
||
def page_crawled(self, response, links): | ||
self.metadata.page_crawled(response, links) | ||
|
||
def request_error(self, request, error): | ||
self.metadata.request_error(request, error) | ||
|
||
def finished(self): | ||
return NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to include these settings in documentation and move format description there. I know docs are boring, but there is nothing we can do.