diff --git a/chaincodes/__init__.py b/chaincodes/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..9b8bd119c16fe0594745a71b3eb26e9c33cf14f5 --- /dev/null +++ b/chaincodes/__init__.py @@ -0,0 +1 @@ +from .chaincodes import * diff --git a/chaincodes/chaincodes.py b/chaincodes/chaincodes.py new file mode 100644 index 0000000000000000000000000000000000000000..470acc0f1777fc5346d1309ac9db525d66f1f4ff --- /dev/null +++ b/chaincodes/chaincodes.py @@ -0,0 +1,261 @@ +# internal imports +import warnings +from functools import reduce +from typing import Union, List, Any + +# external imports +import astropy.units as u +from astropy.coordinates import SkyCoord +import scipy +import sunpy +import sunpy.net +from sunpy.physics.differential_rotation import solar_rotate_coordinate +import numpy as np +import dfp +import astropy +import networkx +import pandas as pd + + +def feature_to_chaincode(x, y, cc, cdelt1, cdelt2): + return sunpy.net.helio.Chaincode([x, y], cc, xdelta=cdelt1, ydelta=cdelt2) + + +def feature_df_to_chaincodes(feature_df): + x = feature_df.cc_x_pix.tolist() + y = feature_df.cc_y_pix.tolist() + c = feature_df.cc.tolist() + cdelt1 = [1]*len(feature_df) + cdelt2 = [1]*len(feature_df) + return dfp.tmap(lambda r: feature_to_chaincode(*r), zip(x, y, c, cdelt1, cdelt2)) + + +def chaincode_to_skycoord(cc, smap): + x, y = cc.coordinates + return SkyCoord.from_pixel(x, y, smap.wcs) + + +def feature_to_skycoord(x, y, cc, cdelt1, cdelt2, obs_time): + return chaincode_to_skycoord( + feature_to_chaincode( + x, y, cc, cdelt1, cdelt2), + obs_time) + + +def feature_df_to_skycoords(feature_df, smap) -> tuple[SkyCoord]: + x = feature_df.cc_x_pix.tolist() + y = feature_df.cc_y_pix.tolist() + c = feature_df.cc.tolist() + dates = feature_df.date_obs.tolist() + smaps = [] + for d in dates: + smap_header = smap.meta.copy() + smap_header["date_obs"] = d + smap_header["date-obs"] = d + smap_copy = sunpy.map.Map(smap.data, smap_header) + smaps.append(smap_copy) + cdelt1 = [1]*len(feature_df) + cdelt2 = [1]*len(feature_df) + return dfp.tmap( + lambda r: rotate_skycoord_to_map(chaincode_to_skycoord(r[0], r[1]), smap), + zip(dfp.tmap(lambda r: feature_to_chaincode(*r), zip(x, y, c, cdelt1, cdelt2)), smaps)) + + +def rotate_skycoord_to_map(skycoord: Union[SkyCoord, List[SkyCoord]], smap: sunpy.map.Map) -> SkyCoord: + if isinstance(skycoord, (tuple, list)): + if len(skycoord) == 0: + return [] + return [rotate_skycoord_to_map(dfp.first(skycoord), smap)] + rotate_skycoord_to_map(dfp.rest(skycoord), smap) + return rotate_skycoord_to_time(skycoord, timestamp=smap.date) + + +def rotate_skycoord_to_time(skycoord: SkyCoord, timestamp: astropy.time.Time) -> SkyCoord: + warnings.resetwarnings() + + with warnings.catch_warnings(): + warnings.simplefilter('ignore') + sk = solar_rotate_coordinate(skycoord, time=timestamp) + return sk + + +def project_chaincode_to_world(cc, smap): + wcs = smap.wcs + x, y = cc.coordinates + coords = [(x[i], y[i]) for i in range(x.shape[0])] + return wcs.wcs_pix2world(coords, 1) + + +def skycoord_to_pixel(skycoord, smap: sunpy.map.Map): + y, x = skycoord.to_pixel(smap.wcs) + y = np.array(y) + x = np.array(x) + y = y[~np.isnan(y)].astype(np.int64) + x = x[~np.isnan(x)].astype(np.int64) + return x, y + + +def complete_outline(x, y): + def interpolate(x1, y1, x2, y2): + # iterative interpolation between two integer coordinates: + # produces every integer between these two points...could be + # better but works. + + dist = int(np.floor(np.sqrt((y2-y1)**2 + (x2-x1)**2))) + if dist > 5000: + dist = 1 # TODO: strange overflow for something that should be one: + # print(x1, y1, x2, y2, dist) + # 342 179 343 178 18446744073709551616 + if dist: + step_x = int(round((x2-x1)/dist)) + step_y = int(round((y2-y1)/dist)) + _x = x1 + _y = y1 + for _ in range(dist-1): + _x += step_x + _y += step_y + yield _x, _y + + out_x, out_y = [x[0]], [y[0]] + for idx in range(1, x.shape[0]): + for xi, yi in interpolate(x[idx-1], y[idx-1], x[idx], y[idx]): + out_x.append(xi) + out_y.append(yi) + out_x.append(x[idx]) + out_y.append(y[idx]) + + out_x = np.array(out_x).astype(np.int64) + out_y = np.array(out_y).astype(np.int64) + return out_x, out_y + + +def infill_outline(outline: np.ndarray) -> np.ndarray: + return scipy.ndimage.binary_fill_holes(outline).astype(int) + + +def diff(a, which="horizontal"): + if which == "both": + return ((diff(a, which="horizontal") + diff(a, which="vertical")) > 0.0).astype(np.float64) + + out = np.zeros_like(a) + for row_idx in range(1, a.shape[0]): + for col_idx in range(1, a.shape[1]): + if which == "horizontal": + if a[row_idx, col_idx-1] != a[row_idx, col_idx]: + if a[row_idx, col_idx-1] == 0: + out[row_idx, col_idx] = 1.0 + else: + out[row_idx, col_idx-1] = 1.0 + elif which == "vertical": + if a[row_idx-1, col_idx] != a[row_idx, col_idx]: + if a[row_idx-1, col_idx] == 0: + out[row_idx, col_idx] = 1.0 + else: + out[row_idx-1, col_idx] = 1.0 + return out + + +def chaincode_to_mask(coord: sunpy.net.helio.Chaincode, smap): + x, y = coord.coordinates + mask = np.zeros_like(smap.data) + mask[y.astype(int), x.astype(int)] = 1.0 + mask = infill_outline(mask) + return mask + + +def skycoord_to_mask(skycoord: astropy.coordinates.SkyCoord, smap: sunpy.map.Map): + x, y = skycoord_to_pixel(skycoord, smap) + output = np.zeros(smap.data.shape) + if x.shape[0] == 0 or y.shape[0] == 0: + return output + x, y = complete_outline(x, y) + x[x>=output.shape[0]] = output.shape[0]-1 + y[y>=output.shape[1]] = output.shape[1]-1 + output[x, y] = 1.0 + output = infill_outline(output) + return output + + +def skycoords_to_mask( + skycoords: Union[tuple[SkyCoord], list[SkyCoord]], + smap: sunpy.map.Map +) -> np.ndarray: + return reduce( + lambda t, x: skycoord_to_mask(x, smap) + t, + dfp.rest(skycoords), + skycoord_to_mask(dfp.first(skycoords), smap)) + + +def feature_df_to_mask(feature_df, smap): + pipeline = dfp.compose( + lambda df: feature_df_to_skycoords(df, smap), + lambda sk: skycoords_to_mask(sk, smap)) + return pipeline(feature_df) + + +def build_chain(outline: np.ndarray) -> dict[str, Any]: + # generate a freeman 8 component chain code + def get_neighbours(x, y): + connected_neighbours: list[tuple[int, int]] = [] + for xi in range(max(x-1, 0), min(x+2, outline.shape[0])): + for yi in range(max(y-1, 0), min(y+2, outline.shape[1])): + if (xi != x or yi != y) and outline[xi, yi] == 1.0: + connected_neighbours.append((xi, yi)) + return connected_neighbours + + graph = networkx.Graph() + indices = np.where(outline == 1.0) + for xy in zip(indices[0], indices[1]): + graph.add_node(xy) + for n in get_neighbours(xy[0], xy[1]): + graph.add_edge(xy, n) + + code = [] + chain = reduce(lambda t, x: x if len(x) > len(t) else t, + networkx.chain_decomposition(graph), + next(iter(networkx.chain_decomposition(graph)))) + for link in chain: + last_x, last_y = link[0] + xi, yi = link[1] + if xi-last_x == 1 and yi-last_y == 0: code.append("6") + if xi-last_x == 1 and yi-last_y == 1: code.append("5") + if xi-last_x == 0 and yi-last_y == 1: code.append("4") + if xi-last_x == -1 and yi-last_y == 1: code.append("3") + if xi-last_x == -1 and yi-last_y == 0: code.append("2") + if xi-last_x == -1 and yi-last_y == -1: code.append("1") + if xi-last_x == 0 and yi-last_y == -1: code.append("0") + if xi-last_x == 1 and yi-last_y == -1: code.append("7") + + return { + "cc_x_pix": chain[0][1][0], + "cc_y_pix": chain[0][1][1], + "cc": "".join(code), + "cc_length": len(code), + } + + +def pixel_to_arcsec(x, y, smap): + s = smap.pixel_to_world(x*u.pixel, y*u.pixel) + return s.Tx.value, s.Ty.value + + +def mask_to_chaincode(mask: np.ndarray, smap: sunpy.map.Map) -> pd.DataFrame: + chaincodes: list[dict[str, Any]] = [] + labelled_array, n_features = scipy.ndimage.label(mask) + for feature_idx in range(1, n_features+1): + _tmp = labelled_array == feature_idx + outline = diff((_tmp == 1).astype(np.int8), "both") + code = build_chain(outline) + chaincodes.append(code) + df = pd.DataFrame(chaincodes) + df["cdelt1"] = smap.meta["cdelt1"] + df["cdelt2"] = smap.meta["cdelt2"] + df["cc_x_arcsec"] = 0.0 + df["cc_y_arcsec"] = 0.0 + for row_idx in range(df.shape[0]): + x, y = df.loc[row_idx, "cc_x_pix"], df.loc[row_idx, "cc_y_pix"] + x, y = pixel_to_arcsec(x, y, smap) + df.loc[row_idx, "cc_x_arcsec"] = x + df.loc[row_idx, "cc_y_arcsec"] = y + df["date_obs"] = smap.date.strftime("%Y-%m-%dT%H:%M:%S") # isoformat + return df + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..5a03a09ab3c779586f3b3d0b1ebaf83895367fe1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,13 @@ +sunpy +numpy +dfp +scipy +astropy +networkx +pandas +matplotlib +requests +bs4 +lxml +zeep +drms diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..2e6d9198edaf3d1a461e490c88c0d9def43e8826 --- /dev/null +++ b/setup.py @@ -0,0 +1,14 @@ +from distutils.core import setup + +with open("requirements.txt") as f: + required_packages = f.read().splitlines() + +setup( + name="chaincodes", + version="1.0", + description="Transformations on the freeman chaincodes", + author="Jay Paul Morgan", + author_email="jay.morgan@univ-tln.fr", + url="https://github.com/jaypmorgan/chaincodes", + packages=["chaincodes"], + install_requires=required_packages)