diff --git a/README.rst b/README.rst index 0ab2771..dd13408 100644 --- a/README.rst +++ b/README.rst @@ -158,6 +158,8 @@ Common parameters: migrations are run against. This option must be used in conjuction with the -k option. This option is ignored unless the -s option is provided. + --protocol-version PROTOCOL_VERSION + Protocol version used to connect to Cassandra. -y, --assume-yes Automatically answer "yes" for all questions migrate diff --git a/cassandra_migrate/cli.py b/cassandra_migrate/cli.py index 634b45a..0754761 100644 --- a/cassandra_migrate/cli.py +++ b/cassandra_migrate/cli.py @@ -70,6 +70,9 @@ def main(): migrations are run against. This option must be used in conjuction with the -k option. This option is ignored unless the -s option is provided.""") + parser.add_argument('--protocol-version', type=int, default=None, + choices=range(1, 6), + help='Protocol version used to connect to Cassandra.') parser.add_argument('-y', '--assume-yes', action='store_true', help='Automatically answer "yes" for all questions') @@ -139,7 +142,8 @@ def main(): user=opts.user, password=opts.password, host_cert_path=opts.ssl_cert, client_key_path=opts.ssl_client_private_key, - client_cert_path=opts.ssl_client_cert) as migrator: + client_cert_path=opts.ssl_client_cert, + protocol_version=opts.protocol_version) as migrator: cmd_method = getattr(migrator, opts.action) if not callable(cmd_method): print('Error: invalid command', file=sys.stderr) diff --git a/cassandra_migrate/migrator.py b/cassandra_migrate/migrator.py index 2db6d9e..6c4f78b 100644 --- a/cassandra_migrate/migrator.py +++ b/cassandra_migrate/migrator.py @@ -53,6 +53,12 @@ VALUES (%s, %s, %s, %s, %s, %s, toTimestamp(now())) IF NOT EXISTS """ +CREATE_DB_VERSION_PROTOCOL_3 = """ +INSERT INTO "{keyspace}"."{table}" +(id, version, name, content, checksum, state, applied_at) +VALUES (%s, %s, %s, %s, %s, %s, dateof(now())) IF NOT EXISTS +""" + FINALIZE_DB_VERSION = """ UPDATE "{keyspace}"."{table}" SET state = %s WHERE id = %s IF state = %s """ @@ -116,7 +122,8 @@ class Migrator(object): def __init__(self, config, profile='dev', hosts=['127.0.0.1'], port=9042, user=None, password=None, host_cert_path=None, - client_key_path=None, client_cert_path=None): + client_key_path=None, client_cert_path=None, + protocol_version=None): self.config = config try: @@ -137,14 +144,21 @@ def __init__(self, config, profile='dev', hosts=['127.0.0.1'], port=9042, else: ssl_options = None - self.cluster = Cluster( - contact_points=hosts, - port=port, - auth_provider=auth_provider, - max_schema_agreement_wait=300, - control_connection_timeout=10, - connect_timeout=30, - ssl_options=ssl_options) + cluster_kwargs = { + "contact_points": hosts, + "port": port, + "auth_provider": auth_provider, + "max_schema_agreement_wait": 300, + "control_connection_timeout": 10, + "connect_timeout": 30, + "ssl_options": ssl_options, + } + # Cluster defaults `protocol_version` to `cluster._NOT_SET` + # Only pass it to `Cluster` if it has been set, otherwise let it use + # its default + if protocol_version is not None: + cluster_kwargs["protocol_version"] = protocol_version + self.cluster = Cluster(**cluster_kwargs) self._session = None @@ -377,8 +391,15 @@ def _create_version(self, version, migration): version, migration)) version_id = uuid.uuid4() + + # Cassandra versions below 2.2 use protocol 3 and use `dateof` instead + # of `toTimestamp` + if self.cluster.protocol_version <= 3: + statement = CREATE_DB_VERSION_PROTOCOL_3 + else: + statement = CREATE_DB_VERSION result = self._execute( - self._q(CREATE_DB_VERSION), + self._q(statement), (version_id, version, migration.name, migration.content, bytearray(migration.checksum), Migration.State.IN_PROGRESS))