add write() to BlitzPy config Classes

This commit is contained in:
frennkie 2020-05-23 12:51:30 +01:00
parent cd38663cd6
commit 5003493847
4 changed files with 345 additions and 256 deletions

View File

@ -5,6 +5,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.2.0] - 2020-05-23
### Added
- add write() to BlitzPy config Classes
## [0.1.0] - 2020-05-22 (Bitcoin Pizza Day)
### Added

View File

@ -1,49 +1,73 @@
# BlitzPy
BlitzPy is a part of the RaspiBlitz project and implements a few common use cases.
## Installation
### Prerequisite
None
### Dependencies
None
### Install BlitzPy
```
cd ~/raspiblitz/home.admin/BlitzPy
pip install dist/BlitzPy-0.1.0-py2.py3-none-any.whl
OR
sudo -H python -m pip install dist/BlitzPy-0.1.0-py2.py3-none-any.whl
```
**or** consider using a virtual environment
```
python3 -m venv --system-site-packages venv
source venv/bin/activate
pip install BlitzPy
```
## Installation
Import and use..
```
from blitzpy import RaspiBlitzConfig
cfg = RaspiBlitzConfig()
cfg.reload()
print(cfg.__dict__)
print(cfg.hostname)
if cfg.run_behind_tor:
print("using TOR!")
```
## License
[MIT License](http://en.wikipedia.org/wiki/MIT_License)
# BlitzPy
BlitzPy is a part of the RaspiBlitz project and implements a few common use cases.
## Installation
### Prerequisite
None
### Dependencies
None
### Install BlitzPy
```
cd ~/raspiblitz/home.admin/BlitzPy
pip install dist/BlitzPy-0.1.0-py2.py3-none-any.whl
OR
sudo -H python -m pip install dist/BlitzPy-0.1.0-py2.py3-none-any.whl
```
**or** consider using a virtual environment
```
python3 -m venv --system-site-packages venv
source venv/bin/activate
pip install BlitzPy
```
## Usage
### Import and use..
```
from blitzpy import RaspiBlitzConfig
cfg = RaspiBlitzConfig()
cfg.reload()
print(cfg.hostname)
if cfg.run_behind_tor:
print("using TOR!")
```
### Changing values
In order to change the content of a setting the `value` attribute needs to be updated!
```
from blitzpy import RaspiBlitzConfig
cfg = RaspiBlitzConfig()
cfg.reload()
print(cfg.hostname)
cfg.hostname.value = "New-Hostname!"
print(cfg.hostname)
```
### Exporting
Use `cfg.write()` to export file (will use default path - override with cfg.write(path="/tmp/foobar.conf").
```
from blitzpy import RaspiBlitzConfig
cfg = RaspiBlitzConfig()
cfg.reload()
cfg.rtl_web_interface.value = True
cfg.write()
```
## License
[MIT License](http://en.wikipedia.org/wiki/MIT_License)

View File

@ -1,206 +1,268 @@
# -*- coding: utf-8 -*-
import logging
import os
from configparser import ConfigParser, DEFAULTSECT
log = logging.getLogger(__name__)
class LndConfig(object):
def __init__(self, abs_path="/mnt/hdd/lnd/lnd.conf"):
self.abs_path = abs_path
# default values for LND Configuration
self.rpc_listen = ""
@property
def rpc_listen_host(self):
return self.rpc_listen.split(":")[0]
@property
def rpc_listen_port(self):
try:
return int(self.rpc_listen.split(":")[1])
except (IndexError, TypeError, ValueError):
return 0
def reload(self):
"""load config from file"""
parser = ConfigParser()
log.debug("loading config from file: {}".format(self.abs_path))
with open(self.abs_path) as f:
parser.read_string(f.read())
app_options = parser["Application Options"]
self.rpc_listen = get_str_clean(app_options, "rpclisten", self.rpc_listen)
class RaspiBlitzConfig(object):
def __init__(self, abs_path="/mnt/hdd/raspiblitz.conf"):
self.abs_path = abs_path
# default values for RaspiBlitz Configuration
self.auto_nat_discovery = False
self.auto_pilot = False
self.auto_unlock = False
self.chain = ""
self.dynDomain = ""
self.dyn_update_url = ""
self.hostname = ""
self.invoice_allow_donations = False
self.invoice_default_amount = 402
self.lcd_rotate = False
self.lnd_address = ""
self.lnd_port = ""
self.network = ""
self.public_ip = ""
self.rtl_web_interface = False
self.run_behind_tor = False
self.ssh_tunnel = ""
self.touchscreen = False
self.version = ""
def reload(self):
"""load config from file"""
parser = ConfigParser()
log.debug("loading config from file: {}".format(self.abs_path))
with open(self.abs_path) as f:
parser.read_string("[{}]\n".format(DEFAULTSECT) + f.read())
default_s = parser[DEFAULTSECT]
self.auto_nat_discovery = get_boolean_safe(default_s, "autoNatDiscovery", self.auto_nat_discovery)
self.auto_pilot = get_boolean_safe(default_s, "autoPilot", self.auto_pilot)
self.auto_unlock = get_boolean_safe(default_s, "autoUnlock", self.auto_unlock)
self.chain = get_str_clean(default_s, "chain", self.chain)
self.dynDomain = get_str_clean(default_s, "dynDomain", self.dynDomain)
self.dyn_update_url = get_str_clean(default_s, "dynUpdateUrl", self.dyn_update_url)
self.hostname = get_str_clean(default_s, "hostname", self.hostname)
self.invoice_allow_donations = get_boolean_safe(default_s, "invoiceAllowDonations",
self.invoice_allow_donations)
self.invoice_default_amount = get_int_safe(default_s, "invoiceDefaultAmount", self.invoice_default_amount)
self.lcd_rotate = get_boolean_safe(default_s, "lcdrotate", self.lcd_rotate)
self.lnd_address = get_str_clean(default_s, "lndAddress", self.lnd_address)
self.lnd_port = get_str_clean(default_s, "lndPort", self.lnd_port)
self.network = get_str_clean(default_s, "network", self.network)
self.public_ip = get_str_clean(default_s, "publicIP", self.public_ip)
self.rtl_web_interface = get_boolean_safe(default_s, "rtlWebinterface", self.rtl_web_interface)
self.run_behind_tor = get_boolean_safe(default_s, "runBehindTor", self.run_behind_tor)
self.ssh_tunnel = get_str_clean(default_s, "sshtunnel", self.ssh_tunnel)
self.touchscreen = get_boolean_safe(default_s, "touchscreen", self.touchscreen)
self.version = get_str_clean(default_s, "raspiBlitzVersion", self.version)
class RaspiBlitzInfo(object):
def __init__(self, abs_path="/home/admin/raspiblitz.info"):
self.abs_path = abs_path
# default values for RaspiBlitz Info
self.base_image = ""
self.chain = ""
self.message = ""
self.network = ""
self.setup_step = 0
self.state = ""
self.undervoltage_reports = 0
def reload(self):
"""load config from file"""
parser = ConfigParser()
log.debug("loading config from file: {}".format(self.abs_path))
with open(self.abs_path) as f:
parser.read_string("[{}]\n".format(DEFAULTSECT) + f.read())
default_s = parser[DEFAULTSECT]
self.base_image = get_str_clean(default_s, "baseimage", self.base_image)
self.chain = get_str_clean(default_s, "chain", self.chain)
self.message = get_str_clean(default_s, "message", self.message)
self.network = get_str_clean(default_s, "network", self.network)
self.setup_step = get_int_safe(default_s, "setupStep", self.setup_step)
self.state = get_str_clean(default_s, "state", self.state)
self.undervoltage_reports = get_int_safe(default_s, "undervoltageReports", self.undervoltage_reports)
def get_boolean_safe(cp_section, key, default_value):
"""take a ConfigParser section, get key that might be string encoded boolean and return boolean"""
try:
value = cp_section.getboolean(key, default_value)
except ValueError:
_value = cp_section.get(key)
value = bool(_value.strip("'").strip('"')) # this will raise an Exception if int() fails!
return value
def get_int_safe(cp_section, key, default_value):
"""take a ConfigParser section, get key that might be string encoded int and return int"""
try:
value = cp_section.getint(key, default_value)
except ValueError:
_value = cp_section.get(key)
value = int(_value.strip("'").strip('"')) # this will raise an Exception if int() fails!
return value
def get_str_clean(cp_section, key, default_value):
"""take a ConfigParser section, get key and strip leading and trailing \' and \" chars"""
value = cp_section.get(key, default_value)
if not value:
return ""
return value.lstrip('"').lstrip("'").rstrip('"').rstrip("'")
def main():
lnd_cfg = LndConfig()
if os.path.exists(lnd_cfg.abs_path):
lnd_cfg.reload()
print("=======\n= LND =\n=======")
print("rpc_list: \t\t{}".format(lnd_cfg.rpc_listen))
print("rpc_list_host: \t\t{}".format(lnd_cfg.rpc_listen_host))
print("rpc_list_port: \t\t{}".format(lnd_cfg.rpc_listen_port))
print("")
rb_cfg = RaspiBlitzConfig()
if os.path.exists(rb_cfg.abs_path):
rb_cfg.reload()
print("====================\n= RaspiBlitzConfig =\n====================")
print("auto_nat_discovery: \t\t{}".format(rb_cfg.auto_nat_discovery))
print("auto_pilot: \t\t\t{}".format(rb_cfg.auto_pilot))
print("auto_unlock: \t\t\t{}".format(rb_cfg.auto_unlock))
print("chain: \t\t\t\t{}".format(rb_cfg.chain))
print("dynDomain: \t\t\t{}".format(rb_cfg.dynDomain))
print("dyn_update_url: \t\t{}".format(rb_cfg.dyn_update_url))
print("hostname: \t\t\t{}".format(rb_cfg.hostname))
print("invoice_allow_donations: \t{}".format(rb_cfg.invoice_allow_donations))
print("invoice_default_amount: \t{}".format(rb_cfg.invoice_default_amount))
print("lcd_rotate: \t\t\t{}".format(rb_cfg.lcd_rotate))
print("lnd_address: \t\t\t{}".format(rb_cfg.lnd_address))
print("lnd_port: \t\t\t{}".format(rb_cfg.lnd_port))
print("network: \t\t\t{}".format(rb_cfg.network))
print("public_ip: \t\t\t{}".format(rb_cfg.public_ip))
print("rtl_web_interface: \t\t{}".format(rb_cfg.rtl_web_interface))
print("run_behind_tor: \t\t{}".format(rb_cfg.run_behind_tor))
print("ssh_tunnel: \t\t\t{}".format(rb_cfg.ssh_tunnel))
print("touchscreen: \t\t\t{}".format(rb_cfg.touchscreen))
print("version: \t\t\t{}".format(rb_cfg.version))
print("")
rb_info = RaspiBlitzInfo()
if os.path.exists(rb_info.abs_path):
rb_info.reload()
print("==================\n= RaspiBlitzInfo =\n==================")
print("state: \t\t{}".format(rb_info.state))
print("")
if __name__ == "__main__":
main()
# -*- coding: utf-8 -*-
import configparser
import copy
import logging
import os
from configparser import DEFAULTSECT, RawConfigParser
log = logging.getLogger(__name__)
_UNSET = object()
class CustomConfigParser(RawConfigParser):
def get(self, section, option, *, raw=False, vars=None, fallback=_UNSET):
val = RawConfigParser.get(self, section, option)
return val.strip('"').strip("'")
class BaseSetting(object):
TYPE = str
def __init__(self, key, aliases=None, default=_UNSET):
self.key = key
self.aliases = aliases
self.default = default
# hidden attributes for properties
self._is_set = False
self._value = None
@property
def is_set(self):
return self._is_set
@property
def export(self):
if self.TYPE == bool:
if self.value:
return f"{self.key}='on'"
else:
return f"{self.key}='off'"
elif self.TYPE == int:
return f"{self.key}={self.value}"
else:
return f"{self.key}='{self.value}'"
@property
def value(self):
if self._is_set:
return self._value
if self.default is not _UNSET:
return self.default
return self.TYPE()
@value.setter
def value(self, new_value):
if not isinstance(new_value, self.TYPE):
raise ValueError(f'must be of type: {self.TYPE}')
self._is_set = True
self._value = new_value
def __eq__(self, other):
return self.value == other
def __ne__(self, other):
return not self.value == other
def __repr__(self):
if not self._is_set:
return f'<{self.__class__.__name__} [{self.key}]: <not set> Default: {self.default}>'
return f'<{self.__class__.__name__} [{self.key}]: {self._value} Default: {self.default}>'
def __str__(self):
return f'{self.key}: {self.value}'
def get(self, section):
"""gets the value for this settings from a ConfigParser.section instance"""
try:
if self.TYPE == bool:
self.value = section.getboolean(self.key, fallback=self.default)
elif self.TYPE == int:
self.value = section.getint(self.key, fallback=self.default)
else:
self.value = section.get(self.key, fallback=self.default)
except configparser.NoOptionError:
pass
class BoolSetting(BaseSetting):
TYPE = bool
class IntSetting(BaseSetting):
TYPE = int
class StrSetting(BaseSetting):
pass
class BaseConfig(object):
def __init__(self, *args, **kwargs):
self.abs_path = None
# store raw file content (list of lines)
self.raw_lines = []
# initialize Custom Config Parser
self.parser = CustomConfigParser(delimiters=["="], strict=False)
@property
def settings(self):
return {attr: value for attr, value in self.__dict__.items() if isinstance(value, BaseSetting)}
@property
def settings_by_keys(self):
return {value.key: value for attr, value in self.__dict__.items() if isinstance(value, BaseSetting)}
@property
def settings_by_aliases_and_keys(self):
ret = dict()
for attr, value in self.__dict__.items():
if isinstance(value, BaseSetting):
ret.update({value.key: value})
if value.aliases:
for alias in value.aliases:
ret.update({alias: value})
return ret
def reload(self):
"""load (or reload) config from file"""
log.debug("loading config from file: {}".format(self.abs_path))
with open(self.abs_path) as f:
raw = f.read()
self.raw_lines = raw.split('\n')
self.parser.read_string("[{}]\n".format(DEFAULTSECT) + raw)
default_s = self.parser[DEFAULTSECT]
for attr, setting in self.settings.items():
setting.get(default_s)
def write(self, path=None):
if not path:
path = self.abs_path
keys_to_process = copy.deepcopy(self.settings_by_aliases_and_keys)
keys_processed = list()
export_lines = list()
for line in self.raw_lines:
line_key = line.split('=')[0]
if line_key in keys_to_process:
setting = self.settings_by_aliases_and_keys[line_key]
export_lines.append(f'{setting.export}\n'.encode())
keys_processed.append(line_key)
keys_to_process.pop(line_key)
if setting.aliases:
for alias in setting.aliases:
try:
keys_to_process.pop(alias)
except KeyError:
pass
else:
# append unknown row as is
export_lines.append(f'{line}\n'.encode())
with open(path, 'wb') as f:
if export_lines[-1] == b'\n':
# remove 1 trailing blank line
export_lines = export_lines[:-1]
f.writelines(export_lines)
if keys_processed:
print("[INFO] Keys processed:\n{}".format(', '.join(keys_processed)))
else:
print("[WARN] Keys processed: None")
if keys_to_process:
print("[WARN] Keys or Aliases not found:\n{}".format(', '.join(keys_to_process)))
else:
print("[INFO] Keys or Aliases not found: None")
class RaspiBlitzConfig(BaseConfig):
def __init__(self, abs_path="/mnt/hdd/raspiblitz.conf"):
super().__init__()
self.abs_path = abs_path
# default values for RaspiBlitz Configuration
self.auto_nat_discovery = BoolSetting('autoNatDiscovery', default=False)
self.auto_pilot = BoolSetting('autoPilot', default=False)
self.auto_unlock = BoolSetting('autoUnlock', default=False)
self.chain = StrSetting('chain', default='main')
self.dyn_domain = StrSetting('dynDomain')
self.dyn_update_url = StrSetting('dynUpdateUrl')
self.hostname = StrSetting('hostname')
self.invoice_allow_donations = BoolSetting('invoiceAllowDonations', default=False)
self.invoice_default_amount = IntSetting('invoiceDefaultAmount', default=402)
self.lcd_rotate = BoolSetting('lcdrotate', default=False)
self.lnd_address = StrSetting('lndAddress')
self.lnd_port = StrSetting('lndPort')
self.network = StrSetting('network', default='bitcoin')
self.public_ip = StrSetting('publicIP')
self.rtl_web_interface = BoolSetting('rtlWebinterface', default=False)
self.run_behind_tor = BoolSetting('runBehindTor', default=False)
self.ssh_tunnel = StrSetting('sshtunnel')
self.touchscreen = BoolSetting('touchscreen', default=False)
self.version = StrSetting('raspiBlitzVersion')
self.lnbits = BoolSetting('LNBits', aliases=['LNbits', 'lnbits'], default=False)
class RaspiBlitzInfo(BaseConfig):
def __init__(self, abs_path="/home/admin/raspiblitz.info"):
super().__init__()
self.abs_path = abs_path
# default values for RaspiBlitz Info
self.base_image = StrSetting('base_image')
self.chain = StrSetting('chain')
self.message = StrSetting('message')
self.network = StrSetting('network')
self.setup_step = IntSetting('setup_step', default=0)
self.state = StrSetting('state')
self.undervoltage_reports = IntSetting('undervoltage_reports', default=0)
def main():
rb_cfg = RaspiBlitzConfig()
if os.path.exists(rb_cfg.abs_path):
rb_cfg.reload()
print("====================\n= RaspiBlitzConfig =\n====================")
print("auto_nat_discovery: \t\t{}".format(rb_cfg.auto_nat_discovery))
print("auto_pilot: \t\t\t{}".format(rb_cfg.auto_pilot))
print("auto_unlock: \t\t\t{}".format(rb_cfg.auto_unlock))
print("chain: \t\t\t\t{}".format(rb_cfg.chain))
print("dynDomain: \t\t\t{}".format(rb_cfg.dyn_domain))
print("dyn_update_url: \t\t{}".format(rb_cfg.dyn_update_url))
print("hostname: \t\t\t{}".format(rb_cfg.hostname))
print("invoice_allow_donations: \t{}".format(rb_cfg.invoice_allow_donations))
print("invoice_default_amount: \t{}".format(rb_cfg.invoice_default_amount))
print("lcd_rotate: \t\t\t{}".format(rb_cfg.lcd_rotate))
print("lnd_address: \t\t\t{}".format(rb_cfg.lnd_address))
print("lnd_port: \t\t\t{}".format(rb_cfg.lnd_port))
print("network: \t\t\t{}".format(rb_cfg.network))
print("public_ip: \t\t\t{}".format(rb_cfg.public_ip))
print("rtl_web_interface: \t\t{}".format(rb_cfg.rtl_web_interface))
print("run_behind_tor: \t\t{}".format(rb_cfg.run_behind_tor))
print("ssh_tunnel: \t\t\t{}".format(rb_cfg.ssh_tunnel))
print("touchscreen: \t\t\t{}".format(rb_cfg.touchscreen))
print("version: \t\t\t{}".format(rb_cfg.version))
print("")
rb_info = RaspiBlitzInfo()
if os.path.exists(rb_info.abs_path):
rb_info.reload()
print("==================\n= RaspiBlitzInfo =\n==================")
print("state: \t\t{}".format(rb_info.state))
print("")
if __name__ == "__main__":
main()

View File

@ -4,5 +4,5 @@
# 3) we can import it into your module module
"""
__version_info__ = ('0', '1', '0')
__version_info__ = ('0', '2', '0')
__version__ = '.'.join(__version_info__)