#!/usr/bin/env python3
"""Module containing the Cp2kPrep class and the command line interface."""
import argparse
import os
import collections.abc
from pathlib import Path
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.configuration import settings
from biobb_common.tools import file_utils as fu
from biobb_common.tools.file_utils import launchlogger
from biobb_cp2k.cp2k.common import check_input_path, check_output_path
[docs]class Cp2kPrep(BiobbObject):
"""
| biobb_cp2k Cp2kPrep
| Helper bb to prepare inputs for the `CP2K QM tool <https://www.cp2k.org/>`_ module.
| Prepares input files for the CP2K QM tool.
Args:
input_inp_path (str) (Optional): Input configuration file (CP2K run options). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k_energy.inp>`_. Accepted formats: pdb (edam:format_1476).
input_pdb_path (str) (Optional): Input PDB file. File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/H2O_box.pdb>`_. Accepted formats: pdb (edam:format_1476).
input_rst_path (str) (Optional): Input restart file (WFN). File type: input. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/data/cp2k/cp2k.wfn>`_. Accepted formats: wfn (edam:format_2333).
output_inp_path (str): Output CP2K input configuration file. File type: output. `Sample file <https://github.com/bioexcel/biobb_cp2k/raw/master/biobb_cp2k/test/reference/cp2k/cp2k_prep_out.inp>`_. Accepted formats: inp (edam:format_2330), in (edam:format_2330), txt (edam:format_2330).
properties (dict - Python dictionary object containing the tool parameters, not input/output files):
* **simulation_type** (*str*) - ("energy") Default options for the cp2k_in file. Each creates a different inp file. Values: `energy <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_energy.inp>`_ (Computes Energy and Forces), `geom_opt <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_geom_opt.inp>`_ (Runs a geometry optimization), `md <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_md.inp>`_ (Runs an MD calculation), `mp2 <https://biobb-cp2k.readthedocs.io/en/latest/_static/cp2k_in/cp2k_mp2.inp>`_ (Runs an MP2 calculation).
* **cp2k_in** (*dict*) - ({}) CP2K run options specification.
* **cell_cutoff** (*float*) - (5.0) CP2K cell cutoff, to build the cell around the system (only used if input_pdb_path is defined).
* **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
* **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
Examples:
This is a use example of how to use the building block from Python::
from biobb_cp2k.cp2k.cp2k_prep import cp2k_prep
prop = {
'simulation_type': 'geom_opt'
}
cp2k_prep(input_pdb_path='/path/to/input.pdb',
input_inp_path='/path/to/cp2k_in.inp',
output_inp_path='/path/to/cp2k_out.inp',
properties=prop)
Info:
* wrapped_software:
* name: In house
* license: Apache-2.0
* ontology:
* name: EDAM
* schema: http://edamontology.org/EDAM.owl
"""
def __init__(self, output_inp_path: str,
input_pdb_path: str = None, input_inp_path: str = None, input_rst_path: str = None,
properties: dict = None, **kwargs) -> None:
properties = properties or {}
# Call parent class constructor
super().__init__(properties)
self.locals_var_dict = locals().copy()
# Input/Output files
self.io_dict = {
'in': {'input_pdb_path': input_pdb_path,
'input_inp_path': input_inp_path,
'input_rst_path': input_rst_path},
'out': {'output_inp_path': output_inp_path}
}
# Properties specific for BB
self.properties = properties
self.simulation_type = properties.get('simulation_type', "energy")
self.cell_cutoff = properties.get('cell_cutoff', 5.0)
self.cp2k_in = properties.get('cp2k_in', dict())
# self.cp2k_in = {k: str(v) for k, v in properties.get('cp2k_in', dict()).items()}
# Check the properties
self.check_properties(properties)
self.check_arguments()
[docs] def iterdict(self, d, depth, fileout_h):
for k, v in d.items():
if k.upper() == "FORCE_EVAL" or k.upper() == "MOTION":
depth = 0
elif "-" in k:
k = k.split("-")[0]
if isinstance(v, dict):
depth = depth+1
if 'name' in v.keys():
print(' ' * depth + "&" + k.upper(), v['name'], file=fileout_h)
else:
print(' ' * depth + "&" + k.upper(), file=fileout_h)
self.iterdict(v, depth, fileout_h)
print(' ' * depth + "&END", k.upper(), file=fileout_h)
depth = depth-1
else:
if k.isnumeric():
print(' ' * depth, v, file=fileout_h)
elif isinstance(v, list):
if not isinstance(v[0], dict):
print(' ' * depth, k, ' '.join(v), file=fileout_h)
elif isinstance(v[0], dict):
depth = depth+1
if k.upper() == 'KIND':
for atom in v:
print(' ' * depth + "&" + k.upper(), atom['name'], file=fileout_h)
self.iterdict(atom, depth, fileout_h)
print(' ' * depth + "&END", k.upper(), file=fileout_h)
elif k.upper() == 'COORD':
print(' ' * depth + "&" + k.upper(), file=fileout_h)
for atom in v:
self.iterdict(atom, depth, fileout_h)
else:
print(' ' * depth + "&" + k.upper(), file=fileout_h)
if k.upper() != 'KIND':
print(' ' * depth + "&END", k.upper(), file=fileout_h)
depth = depth-1
elif k != 'name':
print(' ' * depth, k.upper(), v, file=fileout_h)
# global dict3 = {}
[docs] def parse_rec_def(self, cp2k_in_array, index, stop):
dict = {}
dict2 = {}
depth = 0
rec = False
for line in cp2k_in_array[index:]:
index = index+1
if line.startswith('#') or not line.strip():
continue
if 'END' in line:
depth = depth - 1
vals = line.lstrip().split()
if depth < 0:
return dict
elif '&' in line:
depth = depth + 1
if depth == 1:
vals = line.lstrip().split()
key = vals[0].replace('&', '')
if (key == 'KIND'):
key_name = key + "-" + vals[1]
if dict.get(key):
dict[key].append(self.parse_rec_def(cp2k_in_array, index, key_name))
else:
dict[key] = []
dict[key].append(self.parse_rec_def(cp2k_in_array, index, key_name))
else:
rec = True
dict[key] = self.parse_rec_def(cp2k_in_array, index, key)
if len(vals) > 1 and key != 'KIND':
# print(stop + " Add dict[key]['name'] = " + str(vals[1].strip()))
dict[key]['name'] = vals[1].strip()
elif not rec:
vals = line.lstrip().split()
# print(stop + " Add dict[" + str(vals[0]) + "] = " + str(vals[1].strip()))
if (stop == 'COORD'):
if dict2.get('coords_list'):
dict2['coords_list'].append({vals[0]: vals[1:]})
else:
dict2['coords_list'] = []
dict2['coords_list'].append({vals[0]: vals[1:]})
dict = dict2['coords_list']
elif (len(vals) == 2):
if (stop.startswith('KIND-')):
key2, name = stop.split('-')
dict['name'] = name
dict[vals[0]] = vals[1].strip()
else:
dict[vals[0]] = vals[1:]
return dict
[docs] def parse_pdb(self, pdb_file):
dict = {}
# coord = {}
coord = []
cell = {}
max_x = -999.999
max_y = -999.999
max_z = -999.999
min_x = 999.999
min_y = 999.999
min_z = 999.999
for line in open(pdb_file):
# ATOM 2 C7 JZ4 1 21.520 -27.270 -4.230 1.00 0.00
if line[0:4] == 'ATOM' or line[0:6] == 'HETATM':
# atom = line[12:16]
elem = line[77]
x = line[30:38]
y = line[38:46]
z = line[46:54]
if (float(x) > float(max_x)):
max_x = x
if (float(y) > float(max_y)):
max_y = y
if (float(z) > float(max_z)):
max_z = z
if (float(x) < float(min_x)):
min_x = x
if (float(y) < float(min_y)):
min_y = y
if (float(z) < float(min_z)):
min_z = z
# coord[elem] = [x,y,z]
# lcoord = []
coord.append({elem: [x, y, z]})
# coord[elem] = lcoord
box_x = float(max_x) - float(min_x)
box_y = float(max_y) - float(min_y)
box_z = float(max_z) - float(min_z)
box_x = float(f'{box_x:.3f}')
box_y = float(f'{box_y:.3f}')
box_z = float(f'{box_z:.3f}')
box_x = box_x + self.cell_cutoff
box_y = box_y + self.cell_cutoff
box_z = box_z + self.cell_cutoff
# cell['A'] = [str(box_x),'0.000','0.000']
# cell['B'] = ['0.000',str(box_y),'0.000']
# cell['C'] = ['0.000','0.000',str(box_z)]
cell['ABC'] = [str(box_x), str(box_y), str(box_z)]
dict['coord'] = coord
# dict['coords'] = coords
dict['cell'] = cell
return dict
[docs] def merge(self, a, b):
for key_b in b:
key_bu = key_b.upper()
if key_bu in (key_a.upper() for key_a in a):
for key_a in a:
key_au = key_a.upper()
if "-" in key_au:
key_au = key_au.split("-")[0]
if key_au == key_bu:
if isinstance(a[key_a], dict) and isinstance(b[key_b], dict):
self.merge(a[key_a], b[key_b])
elif isinstance(a[key_a], list) and isinstance(b[key_b], list):
if (key_au == 'KIND'):
for idxB, elemB in enumerate(b[key_b]):
done = False
for idxA, elemA in enumerate(a[key_a]):
if elemB['name'] == elemA['name']:
done = True
self.merge(a[key_a][idxA], b[key_b][idxB])
if not done:
a[key_a].append(b[key_b][idxB])
elif a[key_a] == b[key_b]:
pass # same leaf value
else:
a[key_a] = b[key_b]
else:
a[key_b] = b[key_b]
return a
[docs] def replace_coords(self, a, b):
# dict['force_eval'] = {'subsys' : {'coord' : coord } }
print("BioBB_CP2K, replacing coordinates...")
for key in a:
if key.upper() == 'FORCE_EVAL':
for key_2 in a[key]:
if key_2.upper() == 'SUBSYS':
if 'coord' in a[key][key_2]:
a[key][key_2]['coord'] = b['coord']
elif 'Coord' in a[key][key_2]:
a[key][key_2]['Coord'] = b['coord']
elif 'COORD' in a[key][key_2]:
a[key][key_2]['COORD'] = b['coord']
else:
a[key][key_2]['coord'] = b['coord']
if 'cell' in a[key][key_2]:
if 'ABC' in a[key][key_2]['cell']:
a[key][key_2]['cell']['ABC'] = b['cell']['ABC']
elif 'abc' in a[key][key_2]['cell']:
a[key][key_2]['cell']['abc'] = b['cell']['ABC']
elif 'Abc' in a[key][key_2]['cell']:
a[key][key_2]['cell']['Abc'] = b['cell']['ABC']
else:
a[key][key_2]['cell']['abc'] = b['cell']['ABC']
elif 'Cell' in a[key][key_2]:
if 'ABC' in a[key][key_2]['Cell']:
a[key][key_2]['Cell']['ABC'] = b['cell']['ABC']
elif 'abc' in a[key][key_2]['Cell']:
a[key][key_2]['Cell']['abc'] = b['cell']['ABC']
elif 'Abc' in a[key][key_2]['Cell']:
a[key][key_2]['Cell']['Abc'] = b['cell']['ABC']
else:
a[key][key_2]['Cell']['abc'] = b['cell']['ABC']
elif 'CELL' in a[key][key_2]:
if 'ABC' in a[key][key_2]['CELL']:
a[key][key_2]['CELL']['ABC'] = b['cell']['ABC']
elif 'abc' in a[key][key_2]['CELL']:
a[key][key_2]['CELL']['abc'] = b['cell']['ABC']
elif 'Abc' in a[key][key_2]['CELL']:
a[key][key_2]['CELL']['Abc'] = b['cell']['ABC']
else:
a[key][key_2]['CELL']['abc'] = b['cell']['ABC']
else:
a[key][key_2]['cell'] = b['cell']
return a
[docs] def check_data_params(self, out_log, out_err):
""" Checks input/output paths correctness """
# Check input(s)
self.io_dict["in"]["input_inp_path"] = check_input_path(self.io_dict["in"]["input_inp_path"], "input_inp_path", True, out_log, self.__class__.__name__)
self.io_dict["in"]["input_pdb_path"] = check_input_path(self.io_dict["in"]["input_pdb_path"], "input_pdb_path", True, out_log, self.__class__.__name__)
self.io_dict["in"]["input_rst_path"] = check_input_path(self.io_dict["in"]["input_rst_path"], "input_rst_path", True, out_log, self.__class__.__name__)
# Check output(s)
self.io_dict["out"]["output_inp_path"] = check_output_path(self.io_dict["out"]["output_inp_path"], "output_inp_path", False, out_log, self.__class__.__name__)
[docs] def update(self, d, u):
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = self.update(d.get(k, {}), v)
else:
d[k] = v
return d
[docs] @launchlogger
def launch(self):
"""Launches the execution of the Cp2kPrep module."""
# check input/output paths and parameters
self.check_data_params(self.out_log, self.err_log)
# Setup Biobb
if self.check_restart():
return 0
self.stage_files()
# Generating inp file
# Parsing the input PDB file (if any)
if self.io_dict["in"]["input_pdb_path"]:
coord = self.parse_pdb(self.io_dict["in"]["input_pdb_path"])
# print(coord)
# print(json.dumps(coord,indent=4))
# Parsing the input CP2K file (if any)
if self.io_dict["in"]["input_inp_path"] and self.simulation_type:
print("Incompatible inputs found: simulation_type [{0}] and input_inp_path [{1}].".format(self.simulation_type, self.io_dict['in']['input_inp_path']))
print("Will take just the input_inp_path.")
elif (self.simulation_type):
# path_cp2k_in = PurePath(myself.__file__).parent
path_cp2k_in = Path(os.getenv("CONDA_PREFIX")).joinpath('cp2k_aux')
if (self.simulation_type == 'energy'):
self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_energy.inp"))
elif (self.simulation_type == 'geom_opt'):
self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_geom_opt.inp"))
elif (self.simulation_type == 'md'):
self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_md.inp"))
elif (self.simulation_type == 'mp2'):
self.io_dict["in"]["input_inp_path"] = str(Path(path_cp2k_in).joinpath("cp2k_in/cp2k_mp2.inp"))
else:
fu.log(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type, self.out_log)
raise SystemExit(self.__class__.__name__ + ': ERROR: Simulation type %s not defined' % self.simulation_type)
else:
print("ERROR: Neither simulation type nor input_inp_path were defined.")
if self.io_dict["in"]["input_inp_path"]:
cp2k_in_array = []
with open(self.io_dict["in"]["input_inp_path"], 'r') as cp2k_in_fh:
# inp_in = self.parse(cp2k_in_fh)
for line in cp2k_in_fh:
cp2k_in_array.append(line)
self.inp_in = self.parse_rec_def(cp2k_in_array, 0, 'Stop')
# print(json.dumps(self.inp_in,indent=4))
if self.io_dict["in"]["input_inp_path"] and self.cp2k_in:
final_dict = self.merge(self.inp_in, self.cp2k_in)
# final_dict = self.merge(self.cp2k_in,self.inp_in)
# print(json.dumps(final_dict,indent=4))
elif self.io_dict["in"]["input_inp_path"] and not self.cp2k_in:
final_dict = self.inp_in
# print(json.dumps(final_dict,indent=4))
elif self.cp2k_in and not self.io_dict["in"]["input_inp_path"]:
final_dict = self.cp2k_in
# print(json.dumps(final_dict,indent=4))
else:
print("HOUSTON....")
if self.io_dict["in"]["input_rst_path"]:
# new_dict={'FORCE_EVAL':{'DFT':{'WFN_RESTART_FILE_NAME': os.path.abspath(self.io_dict["in"]["input_rst_path"]), 'SCF' : {'SCF_GUESS':'RESTART'}}}}
new_dict = {'FORCE_EVAL': {'DFT': {'WFN_RESTART_FILE_NAME': Path(self.io_dict["in"]["input_rst_path"]).resolve(), 'SCF': {'SCF_GUESS': 'RESTART'}}}}
self.update(final_dict, new_dict)
# print(json.dumps(final_dict,indent=4))
final_dict2 = final_dict
if self.io_dict["in"]["input_pdb_path"]:
final_dict2 = self.replace_coords(final_dict, coord)
# print(json.dumps(final_dict,indent=4))
with open(self.io_dict["out"]["output_inp_path"], 'w') as cp2k_out_fh:
self.iterdict(final_dict2, 0, cp2k_out_fh)
self.tmp_files.extend([
self.stage_io_dict.get("unique_dir")
])
self.remove_tmp_files()
self.check_arguments(output_files_created=True, raise_exception=False)
return 0
[docs]def cp2k_prep(output_inp_path: str,
input_inp_path: str = None, input_pdb_path: str = None, input_rst_path: str = None,
properties: dict = None, **kwargs) -> int:
"""Create :class:`Cp2kPrep <cp2k.cp2k_prep.Cp2kPrep>`cp2k.cp2k_prep.Cp2kPrep class and
execute :meth:`launch() <cp2k.cp2k_prep.Cp2kPrep.launch>` method"""
return Cp2kPrep(input_inp_path=input_inp_path,
input_pdb_path=input_pdb_path,
input_rst_path=input_rst_path,
output_inp_path=output_inp_path,
properties=properties).launch()
[docs]def main():
parser = argparse.ArgumentParser(description='Prepares input files for the CP2K QM tool.', formatter_class=lambda prog: argparse.RawTextHelpFormatter(prog, width=99999))
parser.add_argument('--config', required=False, help='Configuration file')
# Specific args
required_args = parser.add_argument_group('required arguments')
required_args.add_argument('--output_inp_path', required=True, help='Output CP2K input inp file. Accepted formats: inp, in, txt.')
parser.add_argument('--input_inp_path', required=False, help='Input configuration file (QM options) (CP2K inp). Accepted formats: inp, in, txt.')
parser.add_argument('--input_pdb_path', required=False, help='Input PDB file. Accepted formats: pdb.')
parser.add_argument('--input_rst_path', required=False, help='Input Restart file (WFN). Accepted formats: wfn.')
args = parser.parse_args()
# config = args.config if args.config else None
args.config = args.config or "{}"
# properties = settings.ConfReader(config=config).get_prop_dic()
properties = settings.ConfReader(config=args.config).get_prop_dic()
# Specific call
cp2k_prep(input_inp_path=args.input_inp_path,
input_pdb_path=args.input_pdb_path,
input_rst_path=args.input_rst_path,
output_inp_path=args.output_inp_path,
properties=properties)
if __name__ == '__main__':
main()