Merge pull request #361 from pigmej/solard_separate_repo

solard renamed to solar-agent and moved to separate repo
This commit is contained in:
Łukasz Oleś 2015-11-25 12:49:55 +01:00
commit c1476d37d8
25 changed files with 67 additions and 1090 deletions

View File

@ -19,7 +19,8 @@ RUN apt-get install -y libffi-dev libssl-dev
RUN pip install riak peewee
RUN pip install -U setuptools>=17.1
RUN cd /solar && python setup.py install
RUN cd /solard && python setup.py install
RUN pip install git+git://github.com/Mirantis/solar-agent.git
RUN ansible-playbook -v -i "localhost," -c local /celery.yaml --skip-tags slave
CMD ["/run.sh"]

View File

@ -5,7 +5,7 @@
tasks:
# Setup development env for solar
- shell: pip install -e . chdir=/vagrant
- shell: pip install -e . chdir=/vagrant/solard
- shell: pip install git+git://github.com/Mirantis/solar-agent.git
- hosts: all
tasks:

View File

@ -206,26 +206,26 @@ def setup_haproxies():
@click.command()
@click.argument('i', type=int, required=True)
def add_solard(i):
solard_transport = vr.create('solard_transport%s' % i, 'resources/transport_solard',
{'solard_user': 'vagrant',
'solard_password': 'password'})[0]
def add_solar_agent(i):
solar_agent_transport = vr.create('solar_agent_transport%s' % i, 'resources/transport_solar_agent',
{'solar_agent_user': 'vagrant',
'solar_agent_password': 'password'})[0]
transports = resource.load('transports%s' % i)
ssh_transport = resource.load('ssh_transport%s' % i)
transports_for_solard = vr.create('transports_for_solard%s' % i, 'resources/transports')[0]
transports_for_solar_agent = vr.create('transports_for_solar_agent%s' % i, 'resources/transports')[0]
# install solard with ssh
signals.connect(transports_for_solard, solard_transport, {})
# install solar_agent with ssh
signals.connect(transports_for_solar_agent, solar_agent_transport, {})
signals.connect(ssh_transport, transports_for_solard, {'ssh_key': 'transports:key',
signals.connect(ssh_transport, transports_for_solar_agent, {'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
# add solard to transports on this node
signals.connect(solard_transport, transports, {'solard_user': 'transports:user',
'solard_port': 'transports:port',
'solard_password': 'transports:password',
# add solar_agent to transports on this node
signals.connect(solar_agent_transport, transports, {'solar_agent_user': 'transports:user',
'solar_agent_port': 'transports:port',
'solar_agent_password': 'transports:password',
'name': 'transports:name'})
@ -252,7 +252,7 @@ def undeploy():
main.add_command(deploy)
main.add_command(undeploy)
main.add_command(add_haproxies)
main.add_command(add_solard)
main.add_command(add_solar_agent)
if __name__ == '__main__':

View File

@ -17,18 +17,18 @@ def run():
})[0]
transports = vr.create('transports_node1', 'resources/transports')[0]
transports_for_solard = vr.create('transports_for_solard', 'resources/transports')[0]
transports_for_solar_agent = vr.create('transports_for_solar_agent', 'resources/transports')[0]
ssh_transport = vr.create('ssh_transport', 'resources/transport_ssh',
{'ssh_key': '/vagrant/.vagrant/machines/solar-dev1/virtualbox/private_key',
'ssh_user': 'vagrant'})[0]
solard_transport = vr.create('solard_transport', 'resources/transport_solard',
{'solard_user': 'vagrant',
'solard_password': 'password'})[0]
solar_agent_transport = vr.create('solar_agent_transport', 'resources/transport_solar_agent',
{'solar_agent_user': 'vagrant',
'solar_agent_password': 'password'})[0]
transports_for_solard.connect(solard_transport, {})
ssh_transport.connect(transports_for_solard,{'ssh_key': 'transports:key',
transports_for_solar_agent.connect(solar_agent_transport, {})
ssh_transport.connect(transports_for_solar_agent,{'ssh_key': 'transports:key',
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
@ -40,9 +40,9 @@ def run():
'ssh_user': 'transports:user',
'ssh_port': 'transports:port',
'name': 'transports:name'})
solard_transport.connect(transports, {'solard_user': 'transports:user',
'solard_port': 'transports:port',
'solard_password': 'transports:password',
solar_agent_transport.connect(transports, {'solar_agent_user': 'transports:user',
'solar_agent_port': 'transports:port',
'solar_agent_password': 'transports:password',
'name': 'transports:name'})

View File

@ -0,0 +1,7 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- shell: pip install git+git://github.com/Mirantis/solar-agent.git
- shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solar_agent.pid --startas /bin/bash -- -c "exec /usr/local/bin/solar_agent run --port {{solar_agent_port}} --base tcp > /tmp/solar_agent.log 2>&1"
ignore_errors: True
- shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solar_agent.pid --startas /bin/bash -- -c "exec /usr/local/bin/solar_agent run --port {{solar_agent_port}} --base tcp > /tmp/solar_agent.log 2>&1"

View File

@ -0,0 +1,7 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- shell: pip install git+git://github.com/Mirantis/solar-agent.git
- shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solar_agent.pid --startas /bin/bash -- -c "exec /usr/local/bin/solar_agent run --port {{solar_agent_port}} --base tcp > /tmp/solar_agent.log 2>&1"
ignore_errors: True
- shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solar_agent.pid --startas /bin/bash -- -c "exec /usr/local/bin/solar_agent run --port {{solar_agent_port}} --base tcp > /tmp/solar_agent.log 2>&1"

View File

@ -1,21 +1,21 @@
id: transport_solard
id: transport_solar_agent
handler: ansible
input:
solard_user:
solar_agent_user:
schema: str!
value:
solard_password:
solar_agent_password:
schema: str!
value:
# solard_transport_class:
# solar_agent_transport_class:
# schema: str!
# value:
solard_port:
solar_agent_port:
schema: int!
value: 5555
name:
schema: str!
value: solard
value: solar_agent
location_id:
schema: str
value:

View File

@ -1,7 +0,0 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- shell: pip install -e /vagrant/solard
- shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec /usr/local/bin/solard run --port {{solard_port}} --base tcp > /tmp/solard.log 2>&1"
ignore_errors: True
- shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec /usr/local/bin/solard run --port {{solard_port}} --base tcp > /tmp/solard.log 2>&1"

View File

@ -1,7 +0,0 @@
- hosts: [{{ host }}]
sudo: yes
tasks:
- shell: pip install -e /vagrant/solard
- shell: start-stop-daemon --stop --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec /usr/local/bin/solard run --port {{solard_port}} --base tcp > /tmp/solard.log 2>&1"
ignore_errors: True
- shell: start-stop-daemon -b --start --make-pidfile --pidfile /tmp/solard.pid --chdir /vagrant/solard --startas /bin/bash -- -c "exec /usr/local/bin/solard run --port {{solard_port}} --base tcp > /tmp/solard.log 2>&1"

4
run.sh
View File

@ -3,10 +3,6 @@
# required for ease of development
python setup.py develop
pushd /solard
python setup.py develop
popd
#used only to start celery on docker
ansible-playbook -v -i "localhost," -c local /celery.yaml --skip-tags slave

View File

@ -17,7 +17,7 @@ import handlers
# from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
# from solar.core.transports.rsync import RsyncSyncTransport
# from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport
# from solar.core.transports.solar_agent_transport import SolarAgentRunTransport, SolarAgentSyncTransport
from solar.core.transports.bat import BatRunTransport, BatSyncTransport
@ -25,8 +25,8 @@ _default_transports = {
# 'sync': RsyncSyncTransport,
# 'sync': SSHSyncTransport,
# 'run': SSHRunTransport,
# 'run': SolardRunTransport,
# 'sync': SolardSyncTransport
# 'run': SolarAgentRunTransport,
# 'sync': SolarAgentSyncTransport
'run': BatRunTransport,
'sync': BatSyncTransport
}

View File

@ -24,7 +24,7 @@ from solar import errors
env.warn_only = True
# if we would have something like solard that would render this then
# if we would have something like solar_agent that would render this then
# we would not need to render it there
# for now we redender it locally, sync to remote, run ansible on remote host as local
class AnsibleTemplate(TempFileHandler):

View File

@ -91,7 +91,7 @@ def location_and_transports(emitter, receiver, orig_mapping):
return
if emitter_single.get('is_emit') is False:
# this case is when we connect resource to transport itself
# like adding ssh_transport for solard_transport and we don't want then
# like adding ssh_transport for solar_agent_transport and we don't want then
# transports_id to be messed
# it forbids passing this value around
# log.debug("Disabled %r mapping for %r", single, emitter.name)

View File

@ -2,11 +2,11 @@ from solar.core.transports.base import SyncTransport, RunTransport, SolarTranspo
from solar.core.transports.ssh import SSHSyncTransport, SSHRunTransport
from solar.core.transports.rsync import RsyncSyncTransport
try:
from solar.core.transports.solard_transport import SolardRunTransport, SolardSyncTransport
from solar.core.transports.solar_agent_transport import SolarAgentRunTransport, SolarAgentSyncTransport
except ImportError:
_solard_available = False
_solar_agent_available = False
else:
_solard_available = True
_solar_agent_available = True
try:
from solar.core.transports.torrent import TorrentSyncTransport
@ -29,9 +29,9 @@ KNOWN_RUN_TRANSPORTS = {
if _torrent_available:
KNOWN_SYNC_TRANSPORTS['torrent'] = TorrentSyncTransport
if _solard_available:
KNOWN_SYNC_TRANSPORTS['solard'] = SolardSyncTransport
KNOWN_RUN_TRANSPORTS['solard'] = SolardRunTransport
if _solar_agent_available:
KNOWN_SYNC_TRANSPORTS['solar_agent'] = SolarAgentSyncTransport
KNOWN_RUN_TRANSPORTS['solar_agent'] = SolarAgentRunTransport
class OnAll(object):
@ -87,7 +87,7 @@ class BatTransport(SolarTransport):
class BatSyncTransport(SyncTransport, BatTransport):
preffered_transport_name = None
_order = ('torrent', 'solard', 'rsync', 'ssh')
_order = ('torrent', 'solar_agent', 'rsync', 'ssh')
_bat_transports = KNOWN_SYNC_TRANSPORTS
def __init__(self, *args, **kwargs):
@ -105,7 +105,7 @@ class BatSyncTransport(SyncTransport, BatTransport):
class BatRunTransport(RunTransport, BatTransport):
preffered_transport_name = None
_order = ('solard', 'ssh')
_order = ('solar_agent', 'ssh')
_bat_transports = KNOWN_RUN_TRANSPORTS
def __init__(self, *args, **kwargs):

View File

@ -13,13 +13,13 @@
# under the License.
from solard.client import SolardClient
from solar_agent.client import SolarAgentClient
from solar.core.transports.base import RunTransport, SyncTransport, Executor, SolarRunResult
from solar.core.log import log
class SolardTransport(object):
class SolarAgentTransport(object):
def get_client(self, resource):
transport = self.get_transport_data(resource)
@ -28,18 +28,18 @@ class SolardTransport(object):
port = transport['port']
auth = transport['password']
transport_class = transport.get('transport_class')
client = SolardClient(auth={'user': user, 'auth': auth},
client = SolarAgentClient(auth={'user': user, 'auth': auth},
transport_args=(host, port),
transport_class=transport_class)
return client
class SolardSyncTransport(SyncTransport, SolardTransport):
class SolarAgentSyncTransport(SyncTransport, SolarAgentTransport):
preffered_transport_name = 'solard'
preffered_transport_name = 'solar_agent'
def copy(self, resource, _from, _to, use_sudo=False):
log.debug("Solard copy: %s -> %s", _from, _to)
log.debug("SolarAgent copy: %s -> %s", _from, _to)
client = self.get_client(resource)
executor = lambda transport: client.copy(_from, _to, use_sudo)
@ -49,15 +49,15 @@ class SolardSyncTransport(SyncTransport, SolardTransport):
self.executors.append(executor)
class SolardRunTransport(RunTransport, SolardTransport):
class SolarAgentRunTransport(RunTransport, SolarAgentTransport):
preffered_transport_name = 'solard'
preffered_transport_name = 'solar_agent'
def get_result(self, result):
return SolarRunResult(result)
def run(self, resource, *args, **kwargs):
log.debug("Solard run: %s", args)
log.debug("SolarAgent run: %s", args)
client = self.get_client(resource)
res = client.run(' '.join(args), **kwargs)
return self.get_result(res)

View File

@ -1 +0,0 @@
msgpack-python>=0.4.6

View File

@ -1,51 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import os
from setuptools import find_packages
from setuptools import setup
def find_requires():
prj_root = os.path.dirname(os.path.realpath(__file__))
requirements = []
with open(u'{0}/requirements.txt'.format(prj_root), 'r') as reqs:
requirements = reqs.readlines()
return requirements
setup(
name='solard',
version='0.0.1',
description='Deployment tool daemon',
long_description="""Deployment tool daemon""",
classifiers=[
"Development Status :: 1 - Beta",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7",
"Topic :: System :: Software Distribution"],
author='Mirantis Inc.',
author_email='product@mirantis.com',
url='http://mirantis.com',
keywords='deployment',
packages=find_packages(),
zip_safe=False,
install_requires=find_requires(),
include_package_data=True,
entry_points={
'console_scripts':
['solard = solard.server:cli']}
)

View File

@ -1,108 +0,0 @@
import msgpack
import os
# TODO: handle errors
class SolardClient(object):
read_buffer = 4096
def __init__(self, auth, transport_args, transport_class=None):
if transport_class is None:
from solard.tcp_client import SolardTCPClient
transport_class = SolardTCPClient
self.auth = auth
self.sudo_transport = transport_class(*transport_args)
self.normal_transport = transport_class(*transport_args)
self.make_auth()
def make_auth(self):
self.normal_transport.auth = dict(self.auth)
self.sudo_transport.auth = dict(self.auth)
self.sudo_transport.auth['sudo'] = True
def run(self, *args, **kwargs):
if kwargs.get('use_sudo'):
transport = self.transport(use_sudo=True)
else:
transport = self.transport(use_sudo=False)
send = transport.send({'m': 'run', 'args': args, 'kwargs': kwargs})
resp = transport.resp()
return resp
def transport(self, use_sudo):
if use_sudo:
return self.sudo_transport
return self.normal_transport
def copy_directory(self, _from, _to, use_sudo=False):
# dir should open context on remote, and sync all files as one req/resp
to_cp_files = []
transport = self.transport(use_sudo)
for root, _, files in os.walk(_from):
for name in files:
single_from = os.path.join(root, name)
_to = os.path.join(root.replace(single_from, _to), name)
size = os.stat(single_from).st_size
to_cp_files.append((single_from, _to, size))
tos = [(x[1], x[2]) for x in to_cp_files]
total_size = sum((x[1] for x in tos))
data = {'m': 'copy_files',
'args': (tos, total_size),
's': True}
_ = transport.send(data)
transport.send_stream_start()
for _from, _to, _size in to_cp_files:
# sock = transport.send_stream_cont(add_size=_size)
sock = transport.send_stream_cont()
with open(_from, 'rb') as f:
while _size > 0:
data = f.read(self.read_buffer)
transport.send_stream_data(data)
_size -= len(data)
assert _size == 0 # maybe somehow below ?
transport.send_stream_end()
resp = transport.resp()
return resp
def copy_file(self, _from, _to, use_sudo=False):
transport = self.transport(use_sudo)
f_size = os.stat(_from).st_size
data = {'m': 'copy_file',
'args': (_to, f_size),
's': True}
_ = transport.send(data)
transport.send_stream_start(add_size=False)
to_read = f_size
with open(_from, 'rb') as f:
while to_read > 0:
data = f.read(self.read_buffer) # expose sendfile there
transport.send_stream_data(data)
to_read -= len(data)
assert to_read == 0
transport.send_stream_end()
return self.transport.resp()
def copy(self, _from, _to, use_sudo=False):
if os.path.isdir(_from):
resp = self.copy_directory(_from, _to, use_sudo)
else:
resp = self.copy_file(_from, _to, use_sudo)
return resp
if __name__ == '__main__':
import time
from solard.tcp_client import SolardTCPClient
c = SolardClient(auth={'user': 'vagrant', 'auth': 'password'}, transport_args=('10.0.0.3', 5555), transport_class=SolardTCPClient)
print c.run('hostname')
print c.run('whoami')
print c.run('whoami', use_sudo=True)
print c.copy('/vagrant/library', '/tmp')
# print c.copy('/tmp/a', '/tmp/bbb/b.%s' % (time.time()))
# print c.copy('/tmp/bbb', '/tmp/s/ccc%s' % (time.time()))

View File

@ -1,270 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import string
from contextlib import nested
from fabric import api as fabric_api
from subprocess import check_output
import shlex
from itertools import takewhile
from solard.logger import logger
# XXX: not used for now vvv
# def common_path(paths, sep=os.path.sep):
# paths = [x.split(sep) for x in paths]
# dirs = zip(*(p for p in paths))
# return [x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs)]
# class SolardContext(object):
# def __init__(self):
# self._dirs = {}
# self._files = {}
# def file(self, path):
# try:
# return self._files[path]
# except KeyError:
# if self.is_safe_file(path):
# cls = SolardSafeFile
# else:
# cls = SolardFile
# self._files[path] = f = cls(self, path)
# return f
# def dir(self, path):
# try:
# return self._dirs[path]
# except KeyError:
# self._dirs[path] = solard_dir = SolardDir(self, path)
# return solard_dir
# def is_safe_file(self, path):
# dirname = os.path.dirname(path)
# common = SolardContext.common_path(dirname, self._dirs.keys())
# if common not in ((), ('/', )):
# return False
# return True
# def is_safe_dir(self, path):
# common = SolardContext.common_path(path, self._dirs.keys())
# if common not in ((), ('/', )):
# return False
# return True
# @staticmethod
# def common_path(path, paths, sep=os.path.sep):
# all_paths = paths + [path]
# paths = [x.split(sep) for x in all_paths]
# dirs = zip(*(p for p in all_paths))
# return tuple(x[0] for x in takewhile(lambda x: all(n == x[0] for n in x[1:]), dirs))
# class SolardSafeFile(object):
# def __init__(self, context, target):
# self._f = None
# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
# self._path = target
# self._safe_path = self._path + '_' + self._rnd
# def open(self):
# self._f = open(self._safe_path, 'wb')
# def write(self, data):
# return self._f.write(data)
# def close(self):
# self._f.close()
# def finish(self):
# self.close()
# os.rename(self._safe_path, self._path)
# class SolardFile(object):
# def __init__(self, context, target):
# self._f = None
# self._path = target
# def open(self):
# self._f = open(self._path, 'wb')
# def write(self, data):
# self._f.write(data)
# def close(self):
# self._f.close()
# def finish(self):
# self.close()
# class SolardSafeDir(object):
# def __init__(self, context, target):
# self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
# self._path = target
# self._safe_path = self._path + '_' + self._rnd
# def start(self):
# os.makedirs(self._safe_path)
# def finish(self):
# os.rename(self._safe_path, self._path)
# class SolardDir(object):
# def __init__(self, context, target):
# self._path = target
# def start(self):
# os.makedirs(self._path)
# def finish(self):
# pass
# XXX: not used for now ^^^
class SolardContext(object):
def __init__(self):
self.files = {}
def file(self, path):
try:
return self.files[path]
except KeyError:
self.files[path] = r = SolardFile(self, path)
return r
class SolardFile(object):
def __init__(self, context, target):
self.ctx = context
self._rnd = 'solar' + ''.join((random.choice(string.ascii_lowercase) for _ in xrange(6)))
self._path = target
self._f = None
self._safe_path = self._path + '_' + self._rnd
def open(self):
dirname = os.path.dirname(self._safe_path)
if not os.path.exists(dirname):
os.makedirs(dirname)
if self._f is None:
self._f = open(self._safe_path, 'wb')
def write(self, data):
self._f.write(data)
def finish(self):
self._f.close()
self._f = None
os.rename(self._safe_path, self._path)
class SolardIface(object):
@staticmethod
def run(solard_context, cmd, **kwargs):
# return check_output(shlex.split(cmd))
executor = fabric_api.local
# if kwargs.get('use_sudo', False):
# cmd = 'sudo ' + cmd
managers = []
cwd = kwargs.get('cwd')
if cwd:
managers.append(fabric_api.cd(kwargs['cwd']))
env = kwargs.get('env')
if env:
managers.append(fabric_api.shell_env(**kwargs['env']))
# we just warn, don't exit on solard
# correct data is returned
managers.append(fabric_api.warn_only())
with nested(*managers):
out = executor(cmd, capture=True)
result = {}
for name in ('failed', 'return_code', 'stdout', 'stderr',
'succeeded', 'command', 'real_command'):
result[name] = getattr(out, name)
return result
@staticmethod
def copy_file(solard_context, stream_reader, path, size=None):
f = SolardIface.file_start(solard_context, path)
rdr = stream_reader(size)
for data in rdr:
f.write(data)
SolardIface.file_end(solard_context, path)
return True
@staticmethod
def copy_files(solard_context, stream_reader, paths, total_size):
# total_size not used for now
for _to, _size in paths:
logger.debug("Starting %s size=%d", _to, _size)
f = SolardIface.file_start(solard_context, _to)
if _size > 0:
rdr = stream_reader(_size)
for data in rdr:
f.write(data)
SolardIface.file_end(solard_context, _to)
logger.debug("Done %s size=%d", _to, _size)
return True
# # TODO: not used YET fully
# @staticmethod
# def dir_start(solard_context, path):
# solard_dir = solard_context.dir(path)
# solard_dir.start()
# return solard_dir
# @staticmethod
# def dir_finish(solard_context, path):
# solard_dir = solard_context.dir(path)
# solard_dir.finish()
# return True
@staticmethod
def file_start(solard_context, path):
solard_file = solard_context.file(path)
solard_file.open()
return solard_file
@staticmethod
def file_put_data(solard_context, path, data):
solard_file = solard_context.file(path)
return solard_file.write(data)
@staticmethod
def file_end(solard_context, path):
solard_file = solard_context.file(path)
solard_file.finish()
return True

View File

@ -1,37 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import logging
def __setup_logger():
logger = logging.getLogger("solard")
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s (%(filename)s::%(lineno)s)::%(message)s')
stream = logging.StreamHandler()
stream.setLevel(logging.DEBUG)
stream.setFormatter(formatter)
logger.addHandler(stream)
return logger
__global_logger = None
def get_logger():
global __global_logger
if not __global_logger:
__global_logger = __setup_logger()
return __global_logger
logger = get_logger()

View File

@ -1,26 +0,0 @@
import click
@click.group()
def cli():
pass
def validate_class(ctx, param, value):
supported = ('tcp', )
if not value in supported:
raise click.BadParameter("%r is not one of %r" % (value, supported))
return value
@cli.command()
@click.option('--base', default='tcp', callback=validate_class, type=str)
@click.option('--port', default=5555, type=int)
def run(base, port):
if base == 'tcp':
from solard.tcp_server import SolardTCPServer
runner = SolardTCPServer.run_solard
runner(port)
if __name__ == '__main__':
cli()

View File

@ -1,180 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import msgpack
import socket
import errno
import struct
from solard.tcp_core import *
CLIENT_BUFF = 4096
class ClientException(Exception):
pass
class ReadError(ClientException):
pass
class RemoteException(ClientException):
pass
class RemoteFailure(ClientException):
pass
class SolardTCPClient(object):
def __init__(self, host, port, **kwargs):
self.host = host
self.port = port
# self._connect_timeout = kwargs.get("connect_timeout", None)
self._socket_timeout = kwargs.get("socket_timeout", None)
self.sock = None
self.auth = None
self._streaming = False
def connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.settimeout(self._socket_timeout)
sock.connect((self.host, self.port))
except Exception:
sock.close()
raise
else:
self.sock = sock
if not self.initialize_with_auth():
self.sock = None
raise ClientException("Auth failed")
return sock
def initialize_with_auth(self):
self.send(self.auth)
resp = self.resp(close=False)
return resp
def disconnect(self):
sock = self.sock
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception as e:
err_ = getattr(e, 'errno', None)
if err_ not in (errno.ENOTCONN, errno.EBADF):
raise
ret = sock.close()
self.sock = None
return ret
def send(self, data):
assert self._streaming is False
if self.sock is None:
self.connect()
_data = msgpack.packb(data)
size = len(_data)
hdr = struct.pack(HDR, size)
return self.sock.sendall(hdr + _data)
def send_stream_start(self, add_size=False):
assert self._streaming is False
self._streaming = True
if add_size is not False:
hdr = struct.pack(HDR, add_size)
self.sock.sendall(hdr)
return self.sock
def send_stream_cont(self, add_size=False):
assert self._streaming is True
if add_size is not False:
hdr = struct.pack(HDR, add_size)
self.sock.sendall(hdr)
return self.sock
def send_stream_end(self):
assert self._streaming is True
self._streaming = False
return self.sock
def send_stream_data(self, data):
assert self._streaming is True
self.sock.sendall(data) # TODO: expose sendfile easier
# self._streaming = False
def read(self):
sock = self.sock
d = sock.recv(HDR_SIZE)
if not len(d) == HDR_SIZE:
raise ReadError()
size = struct.unpack(HDR, d)[0]
d = []
while True:
b = min(size, CLIENT_BUFF)
curr = sock.recv(b)
d.append(curr)
size -= len(curr)
if not size:
break
return msgpack.unpackb(''.join(d))
def _resp_result_gen(self, data):
st = data['st']
if st == REPLY_GEN_OK: # OK
return data['res']
elif st == REPLY_GEN_END:
raise StopIteration()
else:
raise RemoteException(data)
def _resp_result_stream(self, data):
return data
def _resp_result(self, data):
st = data['st']
if st == REPLY_OK: # OK
return data['res']
elif st == REPLY_ERR:
raise RemoteException(data)
else:
raise RemoteFailure(data)
def _resp_gen(self, res, close):
try:
while True:
yield res
try:
res = self._resp_result_gen(self.read())
except StopIteration:
break
except Exception:
raise RemoteException(res)
finally:
if close:
self.disconnect()
def resp(self, close=True):
recv = self.read()
st = recv['st']
if REPLY_GEN_OK <= st <= REPLY_GEN_END:
return self._resp_gen(recv, close)
try:
res = self._resp_result(recv)
finally:
if close:
self.disconnect()
return res

View File

@ -1,27 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
import struct
REPLY_OK = 2
REPLY_GEN_OK = 20
REPLY_GEN_END = 21
REPLY_FAIL = 0
REPLY_ERR = 1
HDR = "<I"
HDR_SIZE = struct.calcsize(HDR)
INT_DEFAULT_REPLY_TYPE = 0
INT_GENERATOR_REPLY_TYPE = 1

View File

@ -1,320 +0,0 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License attached#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See then
# License for the specific language governing permissions and limitations
# under the License.
# from gevent import monkey
# monkey.patch_all()
# from gevent.server import StreamServer
from SocketServer import ThreadingTCPServer, BaseRequestHandler
import socket
import threading
import errno
import msgpack
import time
import struct
import errno
import sys
import traceback
import pwd
import os
from types import GeneratorType
from solard.logger import logger
from solard.core import SolardContext, SolardIface
from solard.tcp_core import *
SERVER_BUFF = 4096
class SolardTCPException(Exception):
pass
class ReadFailure(SolardTCPException):
pass
class SolardTCPHandler(object):
def __init__(self, sock, address):
self.sock = sock
self.address = address
self.ctx = SolardContext()
self.auth = None
self._wrote = False
self.forked = False
def _read(self):
# TODO: client closed connection
try:
size = struct.unpack(HDR, self.sock.recv(HDR_SIZE))[0]
except:
raise ReadFailure("Can't read header data")
d = []
while True:
b = min(size, SERVER_BUFF)
curr = self.sock.recv(b)
if not curr:
raise ReadFailure("No data")
d.append(curr)
size -= len(curr)
assert size >= 0
if not size:
break
self._wrote = False
try:
return msgpack.unpackb(''.join(d))
except:
raise
def _read_stream(self, size=None):
if size is None:
try:
size = struct.unpack(HDR, self.sock.recv(HDR_SIZE))[0]
except:
raise ReadFailure("Can't read header data")
while True:
b = min(size, SERVER_BUFF)
curr = self.sock.recv(b)
if not curr:
if size > 0:
raise ReadFailure("Expected more data")
size -= len(curr)
assert size >= 0
yield curr
if not size:
break
def _write(self, **kwargs):
assert self._wrote is False
_data = msgpack.packb(kwargs)
size = len(_data)
hdr = struct.pack(HDR, size)
self.sock.sendall(hdr + _data)
self._wrote = True
def _write_ok(self, res):
# logger.debug("Ok sent")
data = {'st': REPLY_OK, 'res': res}
self._write(**data)
def _write_ok_gen(self, res):
data = {'st': REPLY_GEN_OK, 'res': res}
self._write(**data)
# def _write_ok_stream(self, res):
# data = {'st': 30, 'res': res}
# self._write(**data)
# def _write_stream_data(self, data):
# self.sock.sendall(data)
def _write_gen_end(self):
data = {'st': REPLY_GEN_END, 'res': None}
self._write(**data)
def _write_failure(self, exception, reason, tb=""):
data = {'st': REPLY_FAIL, 'exception': exception, 'reason': reason, 'tb': tb}
self._write(**data)
def _write_err(self, error):
logger.info("Client error: %s" % error)
data = {'st': REPLY_ERR, 'error': error}
self._write(**data)
def make_auth(self):
# it's responsible for:
# - checking auth
# - forking if needed
auth_data = self._read()
if not auth_data:
self._write_ok(False)
return False
req_user = auth_data.get('user')
if not req_user:
self._write_ok(False)
return False
proc_user = pwd.getpwuid(os.getuid())[0]
logger.debug("Requested user %r", req_user)
# TODO:
# we may add there anything we want, checking in file etc
# for now it's just `password`
valid = auth_data.get('auth') == 'password'
if not valid:
self._write_ok(False)
return False
if req_user == proc_user:
self._write_ok(True)
return True
# TODO: very naive
if auth_data.get('sudo'):
self._write_ok(True)
return True
# fork there
child_pid = os.fork()
if child_pid == 0:
pw_uid = pwd.getpwnam(req_user).pw_uid
pw_gid = pwd.getpwuid(pw_uid).pw_gid
os.setgid(pw_gid)
os.setuid(pw_uid)
logger.debug("Child forked %d", os.getpid())
self._fix_env(pw_uid)
self.forked = True
self._write_ok(True)
return True
return None
def _fix_env(self, pw_uid):
pw_dir = pwd.getpwuid(pw_uid).pw_dir
os.environ['HOME'] = pw_dir
def process(self):
try:
known_type = INT_DEFAULT_REPLY_TYPE
input_data = self._read()
if not input_data:
return False
method = input_data['m']
meth = getattr(SolardIface, method)
is_stream = input_data.get('s', False)
logger.debug("Going to run %r", method)
if is_stream:
res = meth(self.ctx, self._read_stream, *input_data.get('args', ()), **input_data.get('kwargs', {}))
else:
res = meth(self.ctx, *input_data.get('args', ()), **input_data.get('kwargs', {}))
if isinstance(res, GeneratorType):
known_type = INT_GENERATOR_REPLY_TYPE
try:
for curr in res:
self._wrote = False
self._write_ok_gen(curr)
except:
raise
finally:
try:
self._wrote = False
self._write_gen_end()
except Exception: # ignore if eng gen couldn't be send
pass
self._wrote = True
else:
# if not input_data.get('empty_ok_resp', False):
self._write_ok(res)
except ReadFailure:
return False
except Exception as ex:
if self._wrote:
if known_type == INT_GENERATOR_REPLY_TYPE:
errno_ = getattr(ex, 'errno', None)
if errno_ in (errno.EPIPE, errno.ECONNRESET):
logger.debug(
"Client disconnected during generator based reply")
else:
logger.debug("Error during generator based reply")
raise
else:
logger.error("Already wrote data, but got exception")
raise
else:
logger.exception("Got exception")
self.handle_exception()
return True
def handle_exception(self):
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.format_exception(exc_type, exc_value, exc_traceback)
reason = str(exc_value)
if not reason:
reason = exc_type.__name__
try:
self._write_failure(exc_type.__name__, reason, tb)
except:
logger.warn("Failure when sending error response")
raise
finally:
logger.exception("Got exception")
class SolardReqHandler(BaseRequestHandler):
def handle(self):
sock = self.request
address = self.client_address
h = SolardTCPHandler(sock, address)
try:
logger.debug("New from %s:%d" % address)
auth_state = h.make_auth()
if auth_state is False:
logger.debug("Failed auth")
return
if auth_state is None:
# child forked
# we don't wait there, but in recycler
return
while True:
if not h.process():
logger.debug("End from %s:%d" % address)
break
else:
logger.debug("Waiting for more from %s:%d" % address)
try:
sock.shutdown(socket.SHUT_RDWR)
except Exception:
pass
finally:
sock.close()
if h.forked:
# if forked we can safely exit now
os._exit(0)
class SolardTCPServer(ThreadingTCPServer):
allow_reuse_address = True
def __init__(self, *args, **kwargs):
# StreamServer.__init__(self, *args, **kwargs)
ThreadingTCPServer.__init__(self, *args, **kwargs)
def dummy_recycle_childs(self):
# dummy child recycler, turns each 3 seconds
def child_recycler():
while True:
try:
pid, status = os.waitpid(-1, 0)
logger.debug("Child %r ended with status=%d", pid, status)
except OSError as e:
if e.errno != errno.ECHILD:
raise
time.sleep(3)
th = threading.Thread(target=child_recycler)
th.daemon = True
th.start()
@staticmethod
def run_solard(port):
s = SolardTCPServer(('0.0.0.0', port), SolardReqHandler)
s.dummy_recycle_childs()
return s.serve_forever()
if __name__ == '__main__':
SolardTCPServer.run_solard(5555)