Skip to content

[WIP] Added Cassandra-Backend #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .idea/dictionaries/osboxes.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

201 changes: 201 additions & 0 deletions frontera/contrib/backends/cassandra/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from __future__ import absolute_import
from cassandra.cluster import Cluster
from cassandra.cqlengine import connection
from cassandra.query import dict_factory
from cassandra.cqlengine.management import sync_table
from cassandra.cqlengine.management import drop_table
from frontera.core.components import DistributedBackend
from frontera.contrib.backends import CommonBackend
from frontera.contrib.backends.cassandra.components import Metadata, Queue, States
from frontera.utils.misc import load_object


class CassandraBackend(CommonBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to include these settings in documentation and move format description there. I know docs are boring, but there is nothing we can do.

cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true
models = settings.get('CASSANDRABACKEND_MODELS')
crawl_id = settings.get('CASSANDRABACKEND_CRAWL_ID')

self.cluster = Cluster(cluster_ips, cluster_port)
self.models = dict([(name, load_object(klass)) for name, klass in models.items()])

self.session_cls = self.cluster.connect()
self.session_cls.row_factory = dict_factory
self.session_cls.encoder.mapping[dict] = self.session_cls.encoder.cql_encode_map_collection
self.crawl_id = crawl_id

if keyspace_create:
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, )
self.session_cls.execute(query)

self.session_cls.set_keyspace(keyspace)

connection.set_session(self.session_cls)

if drop_all_tables:
for key, value in self.models.iteritems():
drop_table(value)

for key, value in self.models.iteritems():
sync_table(value)

self._metadata = Metadata(self.session_cls, self.models['MetadataModel'],
settings.get('CASSANDRABACKEND_CACHE_SIZE'), crawl_id=self.crawl_id)
self._states = States(self.session_cls, self.models['StateModel'],
settings.get('STATE_CACHE_SIZE_LIMIT'), crawl_id=self.crawl_id)
self._queue = self._create_queue(settings)

def frontier_stop(self):
self.states.flush()
self.session_cls.shutdown()

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
crawl_id=self.crawl_id)

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states


class FIFOBackend(CassandraBackend):
component_name = 'Cassandra FIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
ordering='created')


class LIFOBackend(CassandraBackend):
component_name = 'Cassandra LIFO Backend'

def _create_queue(self, settings):
return Queue(self.session_cls, self.models['QueueModel'], settings.get('SPIDER_FEED_PARTITIONS'),
ordering='created_desc')

BASE = CommonBackend
LIFO = LIFOBackend
FIFO = FIFOBackend


class Distributed(DistributedBackend):
def __init__(self, manager):
self.manager = manager
settings = manager.settings
cluster_ips = settings.get('CASSANDRABACKEND_CLUSTER_IPS') # Format: ['192.168.0.1', '192.168.0.2']
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment isn't needed here anymore

cluster_port = settings.get('CASSANDRABACKEND_CLUSTER_PORT')
keyspace = settings.get('CASSANDRABACKEND_KEYSPACE')
keyspace_create = settings.get('CASSANDRABACKEND_CREATE_KEYSPACE_IF_NOT_EXISTS') # Default: true
models = settings.get('CASSANDRABACKEND_MODELS')

self.cluster = Cluster(cluster_ips, cluster_port)
self.models = dict([(name, load_object(klass)) for name, klass in models.items()])

self.session_cls = self.cluster.connect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a session, not a session class. May be worth renaming?

self.session_cls.row_factory = dict_factory

if keyspace_create:
query = """CREATE KEYSPACE IF NOT EXISTS \"%s\"
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}""" % (keyspace, )
self.session_cls.execute(query)
self.session_cls.set_keyspace(keyspace)
connection.set_session(self.session_cls)

self._metadata = None
self._queue = None
self._states = None

@classmethod
def strategy_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop_all_tables = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')
model = b.models['StateModel']

if drop_all_tables:
drop_table(model)

sync_table(model)

b._states = States(b.session_cls, model,
settings.get('STATE_CACHE_SIZE_LIMIT'))
return b

@classmethod
def db_worker(cls, manager):
b = cls(manager)
settings = manager.settings
drop = settings.get('CASSANDRABACKEND_DROP_ALL_TABLES')

metadata_m = b.models['MetadataModel']
queue_m = b.models['QueueModel']
stats_m = b.models['CrawlStatsModel']
if drop:
drop_table(metadata_m)
drop_table(queue_m)
drop_table(stats_m)

sync_table(metadata_m)
sync_table(queue_m)
sync_table(stats_m)

b._metadata = Metadata(b.session_cls, metadata_m,
settings.get('CASSANDRABACKEND_CACHE_SIZE'))
b._queue = Queue(b.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
return b

@property
def queue(self):
return self._queue

@property
def metadata(self):
return self._metadata

@property
def states(self):
return self._states

def frontier_start(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure all these methods need to be copy/pasted here? IMO, it would be great to extract them to parent class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I give it a try :)

for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_start()

def frontier_stop(self):
for component in [self.metadata, self.queue, self.states]:
if component:
component.frontier_stop()

def add_seeds(self, seeds):
self.metadata.add_seeds(seeds)

def get_next_requests(self, max_next_requests, **kwargs):
partitions = kwargs.pop('partitions', [0]) # TODO: Collect from all known partitions
batch = []
for partition_id in partitions:
batch.extend(self.queue.get_next_requests(max_next_requests, partition_id, **kwargs))
return batch

def page_crawled(self, response, links):
self.metadata.page_crawled(response, links)

def request_error(self, request, error):
self.metadata.request_error(request, error)

def finished(self):
return NotImplementedError
Loading