import os, sys, arcpy, argparse
from multiprocessing import Pool, cpu_count
sys.path.append("..")
from os import path
reload(sys)
sys.setdefaultencoding("utf-8")
arcpy.CheckOutExtension("spatial")
arcpy.CheckOutExtension("3D")
def list_files(directory, extension, recursive=False, max_depth=300, current_depth=0, exclude_extension="_template"):
file_paths = []
if current_depth > max_depth:
return file_paths
try:
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
basename, _ = os.path.splitext(filename)
if os.path.isfile(filepath) and filename.endswith(extension) and not basename.endswith(exclude_extension):
file_paths.append(filepath)
elif os.path.isdir(filepath) and recursive:
file_paths.extend(
list_files(filepath, extension, recursive, max_depth, current_depth + 1, exclude_extension)
)
except OSError:
pass
return file_paths
def logger(msg):
arcpy.AddMessage(msg)
def is_ascii(input_str):
try:
input_str.decode("ascii")
return True
except UnicodeDecodeError:
return False
def path_decode(mxd_path):
if not is_ascii(mxd_path):
return mxd_path.decode("utf-8", errors="ignore")
return mxd_path
def replace_element_content(mxd, element_name, new_contents):
"""
@Description 替换mxd文件中一些`TEXT_ELEMENT`元素的文字内容
- param mxd :{param} mxd实例,可以是`arcpy.mapping.MapDocument("current")`
- param element_name :{param} 元素的名称
- param new_contents :{param} 要更新的内容
@returns `{bool}` {description}
"""
try:
title_element = arcpy.mapping.ListLayoutElements(mxd, "TEXT_ELEMENT", element_name)[0]
title_element.text = new_contents
return True
except Exception as e:
logger("modify element.text fail: " + e[0])
return False
def mxdFixZoom(mxd, fram_name_str, layer_name_str):
"""
@Description 以指定图层为中心,修正视图显示范围
- param mxd :{param} mxd实例
- param fram_name_str :{param} 数据框的名称
- param layer_name_str :{param} 图层名称
"""
try:
df = arcpy.mapping.ListDataFrames(mxd, fram_name_str)[0]
layer = arcpy.mapping.ListLayers(mxd, layer_name_str, df)[0]
arcpy.SelectLayerByAttribute_management(layer, "NEW_SELECTION")
df.zoomToSelectedFeatures()
arcpy.SelectLayerByAttribute_management(layer, "CLEAR_SELECTION")
return True
except Exception as e:
return False
def main_old(
mxd_path,
output_path,
sub_title=None,
data_driven=False,
main_range=None,
sub_range=None,
):
mxd = arcpy.mapping.MapDocument(mxd_path)
if sub_range is None:
sub_range = "全局河道"
mxdFixZoom(mxd, "图例", sub_range)
if data_driven:
mxd.dataDrivenPages.currentPageID = data_driven
else:
if main_range is None:
main_range = "项目范围"
mxdFixZoom(mxd, "主视图", sub_range)
if sub_title:
replace_element_content(mxd, "sub_title", sub_title)
arcpy.mapping.ExportToJPEG(mxd, output_path, resolution=120)
return output_path
def main(mxd_path, output_path, dpi=120, sub_title=None, fix_range=[], scale=1):
"""
@Description 根据mxd导出图片
- param mxd_path :{param} mxd文件路径
- param output_path :{param} 输出图片路径,带后缀名
- param sub_title=None :{param} 图例名称
- param fix_range=list :{param} 修正显示范围,格式:{数据框名称}_{图层名称},比如:['主视图_项目范围','图例_全局河道']
@returns `{}` {description}
"""
mxd = arcpy.mapping.MapDocument(mxd_path)
for each_range in fix_range:
if not "_" in each_range:
continue
elif isinstance(each_range, int):
mxd.dataDrivenPages.currentPageID = each_range
else:
fram_name, layer_name = each_range.split("_")
mxdFixZoom(mxd, fram_name, layer_name)
if sub_title:
replace_element_content(mxd, "sub_title", sub_title)
df = arcpy.mapping.ListDataFrames(mxd)[0]
df.scale = df.scale * scale
arcpy.mapping.ExportToJPEG(mxd, output_path, resolution=dpi)
return output_path
def main_pool(args):
main(**args)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("mxd_path", type=str, help="要导出的.mxd文件, 绝对路径")
parser.add_argument("output_path", help="导出的图片绝对路径,根据后缀名判断格式")
parser.add_argument("--exclude_extension", default="_template", type=str, help="要排除的后缀名,默认_template")
parser.add_argument("--scale", default=1, type=float, help="比例尺,默认1,输入1.1即为缩小")
parser.add_argument("--cpu_pool", default=1, type=int, help="进程池,默认1,建议根据cpu实际情况配置")
config = parser.parse_args()
mxd_list = list_files(config.mxd_path, ".mxd", exclude_extension=config.exclude_extension)
if config.cpu_pool == 1:
print("single_model")
for each_mxd in mxd_list:
basename, _ = os.path.splitext(os.path.basename(each_mxd))
if not config.output_path:
output_dir = os.path.dirname(each_mxd)
else:
output_dir = config.output_path
output_path = os.path.join(output_dir, basename + ".jpeg")
main(each_mxd, output_path, scale=config.scale)
elif config.cpu_pool > 1:
print("multiprocessing_model")
MAX_CPS_COUNT = cpu_count()
if config.cpu_pool >= MAX_CPS_COUNT:
cpu_count_value = MAX_CPS_COUNT
else:
cpu_count_value = config.cpu_pool
task_list = []
for each_mxd in mxd_list:
basename, _ = os.path.splitext(os.path.basename(each_mxd))
if not config.output_path:
output_dir = os.path.dirname(each_mxd)
else:
output_dir = config.output_path
output_path = os.path.join(output_dir, basename + ".jpeg")
task_list.append({"mxd_path": each_mxd, "output_path": output_path, "scale": config.scale})
pool = Pool(cpu_count_value)
result = pool.map(main_pool, task_list)