# Author: Carsten Sachse 26-Jan-2011
# Copyright: EMBL (2010 - 2018), Forschungszentrum Juelich (2019 - 2021)
# License: see license.txt for details
"""
Program to examine micrograph quality by computing a localized power spectrum
using EMAN2's e2scaneval.py and an averaged power spectrum from overlapping
tiles using SPARX' sx_welch_pw2.py
"""
import os
from spring.csinfrastr.csfeatures import Features
from spring.csinfrastr.cslogger import Logger
from spring.csinfrastr.csproductivity import DiagnosticPlot, Temporary, OpenMpi
from spring.csinfrastr.csreadinput import OptHandler
from spring.micprgs.scansplit import Micrograph
from spring.segment2d.segmentexam import SegmentExam
from EMAN2 import Region, EMData, Util, EMNumPy
from sparx import filt_gaussl, image_decimate, model_circle, welch_pw2
import matplotlib.font_manager as font_manager
import matplotlib.image as mpimg
import numpy as np
[docs]class MicrographExamPar(object):
"""
Class to initiate default dictionary with input parameters including help and range values and
status dictionary
"""
def __init__(self):
# package/program identity
self.package = 'emspring'
self.progname = 'micexam'
self.proginfo = __doc__
self.code_files = [self.progname, self.progname + '_mpi']
self.micexam_features = Features()
self.feature_set = self.micexam_features.setup(self)
self.define_parameters_and_their_properties()
self.define_program_states()
[docs] def define_parameters_and_their_properties(self):
self.feature_set = self.micexam_features.set_inp_multiple_micrographs(self.feature_set)
self.feature_set = self.micexam_features.set_output_plot_pattern(self.feature_set, self.progname + '_diag.pdf')
self.feature_set = self.micexam_features.set_pixelsize(self.feature_set)
self.feature_set = self.micexam_features.set_power_tile_size(self.feature_set)
self.feature_set = self.micexam_features.set_tile_overlap(self.feature_set)
self.feature_set = self.set_complete_tile_array_option(self.feature_set)
self.feature_set = self.micexam_features.set_binning_option(self.feature_set, default=True)
self.feature_set = self.micexam_features.set_binning_factor(self.feature_set, binfactor=3)
self.feature_set = self.micexam_features.set_mpi(self.feature_set)
self.feature_set = self.micexam_features.set_ncpus_scan(self.feature_set)
self.feature_set = self.micexam_features.set_temppath(self.feature_set)
[docs] def define_program_states(self):
self.feature_set.program_states['readmic']='Read input micrograph'
self.feature_set.program_states['e2scaneval']='e2scaneval.py computes local powerspectra across micrograph'
self.feature_set.program_states['computepower']='Compute powerspectrum of overlapping tiles'
self.feature_set.program_states['visualize_power']='Visualize powerspectrum'
[docs] def set_complete_tile_array_option(self, feature_set):
inp6 = 'Complete tile array option'
feature_set.parameters[inp6] = bool(False)
feature_set.hints[inp6] = 'A complete array of tiles with no gaps will be generated across the micrograph'
feature_set.level[inp6]='expert'
return feature_set
[docs]class MicrographExamEval(object):
"""
* Class that holds functions for examining micrograph quality
* __init__ Function to read in the entered parameter dictionary and load micrograph
#. Usage: MicrographExam(pardict)
#. Input: pardict = OrderedDict of program parameters
"""
def __init__(self, parset = None):
self.log = Logger()
if parset is not None:
self.feature_set = parset
p = self.feature_set.parameters
self.infile=p['Micrographs']
self.micrograph_files = Features().convert_list_of_files_from_entry_string(self.infile)
self.outfile = p['Diagnostic plot pattern']
self.ori_pixelsize = p['Pixel size in Angstrom']
self.tile_size_A = p['Tile size power spectrum in Angstrom']
self.tile_overlap = p['Tile overlap in percent']
self.fullarropt = p['Complete tile array option']
self.binoption = p['Binning option']
self.binfactor = p['Binning factor']
if self.binfactor == 1 and self.binoption is True:
self.binoption = False
self.temppath=p['Temporary directory']
self.mpi_option = p['MPI option']
self.cpu_count = p['Number of CPUs']
[docs] def bin_micrograph(self, each_micrograph_file, binoption, binfactor, ori_pixelsize, tile_size_A=None, tempdir=None):
if binoption:
if tempdir is not None:
temp_binned = os.path.join(tempdir, os.path.basename(each_micrograph_file))
else:
temp_binned = None
each_micrograph_file, segsizepix = SegmentExam().bin_image_stack_by_binfactor(each_micrograph_file,
binfactor, binned_stack=temp_binned)
pixelsize = ori_pixelsize * binfactor
else:
pixelsize = ori_pixelsize
if tile_size_A is not None:
tile_size = 2 * int(tile_size_A / (2 * pixelsize))
else:
tile_size = None
return each_micrograph_file, pixelsize, tile_size
[docs] def prepare_parameters_for_tiling(self, tilesize, pixelsize):
nx = self.nx
ny = self.ny
if 1.1 < pixelsize < 3.5:
binfactor = int(7.0 / pixelsize + 0.5)
elif pixelsize <= 1.1:
binfactor = 6
elif pixelsize >= 3.5:
binfactor = 1
if binfactor >= 2:
powermic = image_decimate(self.img, binfactor)
tilesize = tilesize / binfactor
nx = powermic.get_xsize()
ny = powermic.get_ysize()
self.log.ilog('Micrograph %s was binned %d x for local power spectra analysis' % (self.infile, binfactor))
self.log.dlog('e2scaneval: tilesize:%g, nx: %g, ny: %g' % (tilesize, nx, ny))
else:
powermic = EMData(self.img) # duplicate img object
powermic.del_attr('bitspersample')
powermic.del_attr('datatype')
sigma = powermic.get_attr('sigma')
return nx, ny, tilesize, powermic, sigma
[docs] def adjust_columns_and_rows_depending_on_array_option(self, tilesize, fullarropt, nx, ny):
if fullarropt is True:
box_count_columns = nx / int(tilesize) # number of boxes in x
separation_columns = tilesize # separation between boxes
box_count_rows = ny / int(tilesize)
separation_rows = tilesize
elif fullarropt is False:
box_count_columns = nx / int(1.5 * tilesize) # number of boxes in x
box_count_rows = ny / int(1.5 * tilesize)
self.log.dlog('{0}, {1}, {2}, {3}'.format(nx, ny, box_count_columns, box_count_rows))
separation_columns = tilesize * 3 // 2+ (nx % (tilesize * 3 // 2)) / box_count_columns - 1 # separation between boxes
separation_rows = tilesize * 3 // 2+ (ny % int(1.5 * tilesize)) / box_count_rows
return box_count_columns, box_count_rows, separation_columns, separation_rows
[docs] def insert_tiles_onto_micrograph(self, tilesize, powermic, sigma, box_count_columns, box_count_rows,
separation_columns, separation_rows):
for x in range(int(box_count_columns)):
for y in range(int(box_count_rows)):
cl = powermic.get_clip(Region(x * separation_columns + (separation_columns - tilesize) / 2, y * \
separation_rows + (separation_rows - tilesize) / 2, tilesize, tilesize))
cl.process_inplace('normalize.edgemean')
cl.process_inplace('math.realtofft')
cl.process_inplace('normalize.edgemean')
try:
cl *= 5.0 * float(sigma) / float(cl.get_attr('sigma'))
except:
pass
#powermic.insert_clip(cl, (10, 10))
powermic.insert_clip(cl, (int(x * separation_columns + (separation_columns - tilesize) // 2), int(y * \
separation_rows + (separation_rows - tilesize) // 2), 0))
self.log.ilog('Power spectrum added in column %d and row %d' % (x, y))
return powermic
[docs] def e2scaneval(self, tilesize, fullarropt, pixelsize):
"""
* Function to use EMAN2's e2scaneval.py
#. Usage: output1 = e2scaneval(tilesize)
#. Input: tilesize in pixels
#. Output: MRC file with 'eval' appended
"""
self.log.fcttolog()
nx, ny, tilesize, powermic, sigma = self.prepare_parameters_for_tiling(tilesize, pixelsize)
box_count_columns, box_count_rows, separation_columns, separation_rows = \
self.adjust_columns_and_rows_depending_on_array_option(tilesize, fullarropt, nx, ny)
powermic = self.insert_tiles_onto_micrograph(tilesize, powermic, sigma, box_count_columns, box_count_rows,
separation_columns, separation_rows)
self.evalpng = self.tempdir + 'eval.png'
powermic.write_image(self.evalpng)
return self.evalpng
[docs]class MicrographExam(MicrographExamExtract):
[docs] def add_entire_micrograph_with_power_tile_overlay(self, evalpng, infile, ax1):
# ax1
evalimg = mpimg.imread(evalpng)
ax1.imshow(evalimg, cmap='gray', interpolation='nearest')
os.remove(evalpng)
self.log.ilog('Micrograph %s included in montage' % infile)
return ax1
[docs] def add_sum_of_overlapping_powerspectra(self, pw2sum, ax2):
# ax2
ax2.set_title('Sum of overlapping power spectra')
twodpow = mpimg.imread(pw2sum)
ax2.imshow(twodpow, cmap='gray', interpolation='nearest')
os.remove(pw2sum)
self.log.ilog('Sum of power spectra included')
return ax2
[docs] def add_rotational_avg_of_above(self, pw2sumrotavg, ax3):
# ax3
ax3.set_title('Rotational avg of above')
two2davg = mpimg.imread(pw2sumrotavg)
ax3.imshow(two2davg, cmap='gray')
os.remove(pw2sumrotavg)
self.log.ilog('Rotational average included in montage')
return ax3
[docs] def add_one_dimensional_power_spectra(self, pw2oned, pw2lineenh, pixelsize, ax4):
# ax4
ax4.set_title('Power spectra profile of above')
ax4.title.set_fontsize(9)
ax4.set_yticks([])
for t in ax4.get_xticklabels():
t.set_fontsize(6)
ax4.set_xlabel('Resolution (1/Angstrom)', fontsize=8)
# linex = np.arange(1, 1 + len(pw2oned))
# linex = linex / (len(linex) * 2 * pixelsize)
linex = np.linspace(0, 1/(2 * pixelsize), len(pw2oned))
pw2oned = pw2oned / max(pw2oned)
ax4.plot(linex, pw2oned, linewidth=.2)
# linex = self.compute_one_over_angstrom_line(pw2lineenh, pixelsize)
linex = np.linspace(0, 1/(2 * pixelsize), len(pw2lineenh))
ax4.plot(linex, pw2lineenh, linewidth=0.2, label='enhanced')
ax4.set_xlim(0, linex[-1])
ax4.legend(loc=(0.2, -0.5), prop=font_manager.FontProperties(size='x-small'))
self.log.ilog('One-dimensional profiles included in montage')
return ax4
[docs] def visualize_power(self, evalpng, pw2sum, pw2sumrotavg, pw2oned, pw2lineenh, pixelsize, infile, outfile):
"""
* Function to visualize power analysis of micrograph
#. Usage: fig = visualize_power(evalpng, pw2sum, pw2sumrotavg, pw2oned, pw2lineenh)
#. Input: evalpng = micrograph montage from e2scaneval.py, pw2sum = computed sum of \
overlapping powerspectra, pw2sumrotavg = rotational average of sum, pw2oned = \
1D profile of rotational average, pw2lineenh = enhanced 1D profile comensated \
against amplitude decay
"""
self.log.fcttolog()
micexam_plot = DiagnosticPlot()
ax1 = micexam_plot.plt.subplot2grid((3,3), (0,0), colspan=2, rowspan=3)
ax2 = micexam_plot.plt.subplot2grid((3,3), (0,2), colspan=1, rowspan=1)
ax3 = micexam_plot.plt.subplot2grid((3,3), (1,2), colspan=1, rowspan=1)
ax4 = micexam_plot.plt.subplot2grid((3,3), (2,2), colspan=1, rowspan=1)
self.fig = micexam_plot.add_header_and_footer(self.feature_set, infile, outfile)
ax1 = self.add_entire_micrograph_with_power_tile_overlay(evalpng, infile, ax1)
ax2 = self.add_sum_of_overlapping_powerspectra(pw2sum, ax2)
ax3 = self.add_rotational_avg_of_above(pw2sumrotavg, ax3)
for plot in (ax1, ax2, ax3):
plot.set_xticks([]); plot.set_yticks([])
plot.title.set_fontsize(9)
ax4 = self.add_one_dimensional_power_spectra(pw2oned, pw2lineenh, pixelsize, ax4)
self.log.ilog('Montage %s written.' %outfile)
self.fig.savefig(outfile, orientation='landscape', dpi=400)
return self.fig
[docs] def examine_scans_computing_total_and_local_powerspectra(self, micrograph_files, outfiles):
self.log.plog(10)
for each_micrograph_index, each_micrograph_file in enumerate(micrograph_files):
each_micrograph_file, self.pixelsize, self.tilesize = self.bin_micrograph(each_micrograph_file,
self.binoption, self.binfactor, self.ori_pixelsize, self.tile_size_A, self.tempdir)
self.img, self.nx, self.ny = Micrograph().readmic(each_micrograph_file)
eval_png = self.e2scaneval(self.tilesize, self.fullarropt, self.pixelsize)
pw2sum, pw2sumrotavg, pw2oned, pw2lineenh = self.compute_power_spectra(self.img, self.pixelsize,
self.tilesize, self.tile_overlap)
self.visualize_power(eval_png, pw2sum, pw2sumrotavg, pw2oned, pw2lineenh, self.pixelsize,
each_micrograph_file, outfiles[each_micrograph_index])
if self.binoption:
os.remove(each_micrograph_file)
self.log.plog((each_micrograph_index + 1) * 100.0 / len(micrograph_files))
[docs] def exam_scans(self):
if len(self.micrograph_files) < self.cpu_count:
self.cpu_count = len(self.micrograph_files)
self.feature_set.parameters['Number of CPUs']=self.cpu_count
OpenMpi().setup_and_start_mpi_version_if_demanded(self.mpi_option, self.feature_set, self.cpu_count)
self.tempdir = Temporary().mktmpdir(self.temppath)
outfiles = Features().rename_series_of_output_files(self.micrograph_files, self.outfile)
self.examine_scans_computing_total_and_local_powerspectra(self.micrograph_files, outfiles)
os.rmdir(self.tempdir)
self.log.endlog(self.feature_set)
[docs]def main():
# Option handling
parset = MicrographExamPar()
mergeparset = OptHandler(parset)
######## Program
micrograph = MicrographExam(mergeparset)
micrograph.exam_scans()
if __name__ == '__main__':
main()