Source code for track.distributed.cockroachdb

import os
import time
import subprocess
import traceback
import shutil

from multiprocessing import Process, Manager

from track.utils.log import error, warning, info, debug
from track.versioning import compute_version

import signal


VERSION = '19.1.1'

COCKROACH_HASH = {
    'posix': '051b9f3afd3478b62e3fce0d140df6f091b4a1e4ef84f05c3f1c3588db2495fa',
    'macos': 'ec1fe3dfb55c67b74c3f04c15d495a55966b930bb378b416a5af5f877fb093de'
}

_base = os.path.dirname(os.path.realpath(__file__))

COCKROACH_BIN = {
    'posix': f'{_base}/cockroach/cockroach_linux',
    'macos': f'{_base}/cockroach/cockroach_macos'
}


[docs]class CockRoachDB: """ cockroach db is a highly resilient database that allow us to remove the Master in a traditional distributed setup. This spawn a cockroach node that will store its data in `location` """ def __init__(self, location, addrs, join=None, clean_on_exit=True): self.location = location logs = f'{location}/logs' temp = f'{location}/tmp' external = f'{location}/extern' store = location os.makedirs(logs, exist_ok=True) os.makedirs(temp, exist_ok=True) os.makedirs(external, exist_ok=True) self.location = location self.addrs = addrs self.bin = COCKROACH_BIN.get(os.name) if self.bin is None: raise RuntimeError('Your OS is not supported') if not os.path.exists(self.bin): info('Using system binary') self.bin = 'cockroach' else: hash = COCKROACH_HASH.get(os.name) if compute_version([self.bin]) != hash: warning('Binary Hashes do not match') self.arguments = [ 'start', '--insecure', f'--listen-addr={addrs}', f'--external-io-dir={external}', f'--store={store}', f'--temp-dir={temp}', f'--log-dir={logs}', f'--pid-file={location}/cockroach_pid' ] if join is not None: self.arguments.append(f'--join={join}') self.manager: Manager = Manager() self.properties = self.manager.dict() self.properties['running'] = False self.clean_on_exit = clean_on_exit self._process: Process = None self.cmd = None def _start(self, properties): kwargs = dict( args=' '.join([self.bin] + self.arguments), stdout=subprocess.PIPE, bufsize=1, stderr=subprocess.STDOUT ) self.cmd = kwargs['args'] with subprocess.Popen(**kwargs, shell=True) as proc: try: properties['running'] = True properties['pid'] = proc.pid while properties['running']: if proc.poll() is None: line = proc.stdout.readline().decode('utf-8') if line: self.parse(properties, line) else: properties['running'] = False properties['exit'] = proc.returncode except Exception: error(traceback.format_exc())
[docs] def start(self, wait=True): self._process = Process(target=self._start, args=(self.properties,)) self._process.start() # wait for all the properties to be populated if wait: while self.properties.get('nodeID') is None and self._process.is_alive(): time.sleep(0.01) self.properties['db_pid'] = int(open(f'{self.location}/cockroach_pid', 'r').read()) self._setup()
def _setup(self, client='track_client'): out = subprocess.check_output(f'{self.bin} user set {client} --insecure --host={self.addrs}', shell=True) debug(out.decode('utf8').strip()) create_db = f""" CREATE DATABASE IF NOT EXISTS track; SET DATABASE = track; GRANT ALL ON DATABASE track TO {client}; CREATE TABLE IF NOT EXISTS track.projects ( uid BYTES PRIMARY KEY, name STRING, description STRING, metadata JSONB, trial_groups BYTES[], trials BYTES[] ); CREATE TABLE IF NOT EXISTS track.trial_groups ( uid BYTES PRIMARY KEY, name STRING, description STRING, metadata JSONB, trials BYTES[], project_id BYTES ); CREATE TABLE IF NOT EXISTS track.trials ( uid BYTES, hash BYTES, revision SMALLINT, name STRING, description STRING, tags JSONB, version BYTES, group_id BYTES, project_id BYTES, parameters JSONB, metadata JSONB, metrics JSONB, chronos JSONB, status JSONB, errors JSONB, PRIMARY KEY (hash, revision) );""".encode('utf8') out = subprocess.check_output(f'{self.bin} sql --insecure --host={self.addrs}', input=create_db, shell=True) debug(out.decode('utf8').strip())
[docs] def stop(self): self.properties['running'] = False self._process.join(timeout=5) self._process.terminate() os.kill(self.properties['db_pid'], signal.SIGTERM) if self.clean_on_exit: shutil.rmtree(self.location)
[docs] def wait(self): while True: time.sleep(0.01)
def __enter__(self): self.start() def __exit__(self, exc_type, exc_val, exc_tb): self.stop() if exc_type is not None: raise exc_type
[docs] def parse(self, properties, line): if line[0] == '*': return try: a, b = line.split(':', maxsplit=1) properties[a.strip()] = b.strip() except Exception as e: print(e, line, end='\n') print(traceback.format_exc()) raise RuntimeError(f'{line} (cmd: {self.cmd})')
# properties that are populated once the server has started @property def node_id(self): return self.properties.get('nodeID') @property def status(self): return self.properties.get('status') @property def sql(self): return self.properties.get('sql') @property def client_flags(self): return self.properties.get('client flags') @property def webui(self): return self.properties.get('webui') @property def build(self): return self.properties.get('build')