mikeio
st=>start: 输入
mikeio=>operation: 使用mikeio读取dfsu/mesh文件
geopandas=>operation: 使用geopandas转换成shp/geojson
e=>end: 完成
st->mikeio->geopandas->e
mesh to shp
from mikeio import Mesh
import pandas as pd
import geopandas as gpd
# 文件
filename = r"XC-BE.mesh"
msh = Mesh(filename)
# 获取shp对象
shp = msh.to_shapely()
buffer = shp.buffer(0)
# 导出shp文件
gdf = gpd.GeoSeries([buffer])
gdf.to_file('111.shp')
dfsu to shp
pandas
from typing import Optional, List, Union
import numpy as np
from mikeio import Dfsu
import pandas as pd
import geopandas as gpd
def dfsu_to_shp(
dfsu_file: str,
out_file: str,
data_items: Optional[List[Union[str, int]]] = None,
time_step: Optional[int] = None,
value_filter: Optional[List[float]] = None,
value_field: str = "value",
) -> str:
"""将DFSU文件中的标量场数据导出为Shapefile
Args:
dfsu_file: 输入DFSU文件路径
out_file: 输出Shapefile路径
data_items: 数据项名称/索引,默认读取第一个数据项
time_step: 时间步索引,默认最后一步
value_filter: 过滤特定数值范围(闭区间)
value_field: 数据值字段名称
Returns:
成功返回输出路径,失败返回空字符串
"""
try:
# 读取DFSU文件
dfs = Dfsu(dfsu_file)
# 设置默认参数
data_items = data_items or [0]
time_step = time_step if time_step is not None else dfs.n_timesteps - 1
# 读取数据
dataset = dfs.read(items=data_items, time=time_step)
values = np.round(dataset[0].values, 3)
# 构建数据框
df = pd.DataFrame({
value_field: values,
"coord_x": dfs.element_coordinates[:, 0],
"coord_y": dfs.element_coordinates[:, 1]
})
# 数值过滤
if value_filter and len(value_filter) == 2:
min_val, max_val = sorted(value_filter)
df = df[(df[value_field] >= min_val) & (df[value_field] <= max_val)]
# 生成地理数据框
gdf = gpd.GeoDataFrame(
df[[value_field]],
geometry=gpd.points_from_xy(df.coord_x, df.coord_y),
crs=dfs.projection_string
)
# 导出Shapefile
gdf.to_file(out_file)
return out_file
except Exception as e:
print(f"导出失败: {str(e)}")
return ""
if __name__ == "__main__":
tar = r"D:\filed.dfsu"
dfsu_to_shp(tar, "tar.shp", ["Total water depth"], 228)
- geopandas(可能有BUG)
from mikeio import Dfsu,Dfs0,Mesh
import pandas as pd
import geopandas as gpd
filename = "xxx.dfsu"
item = 'Current speed'
step = -1
df = Dfsu(filename)
data = dfsu.read()
item_data = data[item][setp]
item_dataframe = pd.DataFrame({'sp': item_data})
# 获取shp格式
shp = dfsu.to_shapely()
poly_list = [e for e in shp] #???
# 转换为地理数据格式
gdf = gpd.GeoDataFrame(item_dataframe, geometry=poly_list)
# 导出shp
gdf.to_file("xxx.shp")
shp to geojson
- 使用python转换
# shp to GeoJson
import geopandas as gpd
data = gpd.read_file('中国省级区域20200720.shp')
data.to_file("中国省级区域20200720.json", driver='GeoJSON', encoding="utf-8")
# GeoJson to shp
data = gpd.read_file('中国省级区域20200720.json')
data.to_file('中国省级区域20200720', driver='ESRI Shapefile', encoding='utf-8')
- 使用arcmap转换
在arcgis里面使用工具把shp数据处理成geoJSON数据。使用工具为arcgis工具箱 Conversion Tools->JSON->Features To JSON
读取最后一次setp数据
dfs = mikeio.open(dfsu_file)
# 因为数组索引从0开始,所以-1进行修正
last_setp = int(dfs.n_timesteps) - 1
items = ["Current speed", "Current direction"]
all_data = dfs.read(items=items, time=last_setp)