diff --git a/chaincodes/chaincodes.py b/chaincodes/chaincodes.py index 470acc0f1777fc5346d1309ac9db525d66f1f4ff..e9a78e1f546e2e073266c1ae4cbe19418d54d6eb 100644 --- a/chaincodes/chaincodes.py +++ b/chaincodes/chaincodes.py @@ -25,8 +25,8 @@ def feature_df_to_chaincodes(feature_df): x = feature_df.cc_x_pix.tolist() y = feature_df.cc_y_pix.tolist() c = feature_df.cc.tolist() - cdelt1 = [1]*len(feature_df) - cdelt2 = [1]*len(feature_df) + cdelt1 = [1] * len(feature_df) + cdelt2 = [1] * len(feature_df) return dfp.tmap(lambda r: feature_to_chaincode(*r), zip(x, y, c, cdelt1, cdelt2)) @@ -37,9 +37,8 @@ def chaincode_to_skycoord(cc, smap): def feature_to_skycoord(x, y, cc, cdelt1, cdelt2, obs_time): return chaincode_to_skycoord( - feature_to_chaincode( - x, y, cc, cdelt1, cdelt2), - obs_time) + feature_to_chaincode(x, y, cc, cdelt1, cdelt2), obs_time + ) def feature_df_to_skycoords(feature_df, smap) -> tuple[SkyCoord]: @@ -54,26 +53,36 @@ def feature_df_to_skycoords(feature_df, smap) -> tuple[SkyCoord]: smap_header["date-obs"] = d smap_copy = sunpy.map.Map(smap.data, smap_header) smaps.append(smap_copy) - cdelt1 = [1]*len(feature_df) - cdelt2 = [1]*len(feature_df) + cdelt1 = [1] * len(feature_df) + cdelt2 = [1] * len(feature_df) return dfp.tmap( lambda r: rotate_skycoord_to_map(chaincode_to_skycoord(r[0], r[1]), smap), - zip(dfp.tmap(lambda r: feature_to_chaincode(*r), zip(x, y, c, cdelt1, cdelt2)), smaps)) + zip( + dfp.tmap(lambda r: feature_to_chaincode(*r), zip(x, y, c, cdelt1, cdelt2)), + smaps, + ), + ) -def rotate_skycoord_to_map(skycoord: Union[SkyCoord, List[SkyCoord]], smap: sunpy.map.Map) -> SkyCoord: +def rotate_skycoord_to_map( + skycoord: Union[SkyCoord, List[SkyCoord]], smap: sunpy.map.Map +) -> SkyCoord: if isinstance(skycoord, (tuple, list)): if len(skycoord) == 0: return [] - return [rotate_skycoord_to_map(dfp.first(skycoord), smap)] + rotate_skycoord_to_map(dfp.rest(skycoord), smap) + return [ + rotate_skycoord_to_map(dfp.first(skycoord), smap) + ] + rotate_skycoord_to_map(dfp.rest(skycoord), smap) return rotate_skycoord_to_time(skycoord, timestamp=smap.date) -def rotate_skycoord_to_time(skycoord: SkyCoord, timestamp: astropy.time.Time) -> SkyCoord: +def rotate_skycoord_to_time( + skycoord: SkyCoord, timestamp: astropy.time.Time +) -> SkyCoord: warnings.resetwarnings() with warnings.catch_warnings(): - warnings.simplefilter('ignore') + warnings.simplefilter("ignore") sk = solar_rotate_coordinate(skycoord, time=timestamp) return sk @@ -100,24 +109,24 @@ def complete_outline(x, y): # produces every integer between these two points...could be # better but works. - dist = int(np.floor(np.sqrt((y2-y1)**2 + (x2-x1)**2))) + dist = int(np.floor(np.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2))) if dist > 5000: dist = 1 # TODO: strange overflow for something that should be one: # print(x1, y1, x2, y2, dist) # 342 179 343 178 18446744073709551616 if dist: - step_x = int(round((x2-x1)/dist)) - step_y = int(round((y2-y1)/dist)) + step_x = int(round((x2 - x1) / dist)) + step_y = int(round((y2 - y1) / dist)) _x = x1 _y = y1 - for _ in range(dist-1): + for _ in range(dist - 1): _x += step_x _y += step_y yield _x, _y out_x, out_y = [x[0]], [y[0]] for idx in range(1, x.shape[0]): - for xi, yi in interpolate(x[idx-1], y[idx-1], x[idx], y[idx]): + for xi, yi in interpolate(x[idx - 1], y[idx - 1], x[idx], y[idx]): out_x.append(xi) out_y.append(yi) out_x.append(x[idx]) @@ -134,23 +143,25 @@ def infill_outline(outline: np.ndarray) -> np.ndarray: def diff(a, which="horizontal"): if which == "both": - return ((diff(a, which="horizontal") + diff(a, which="vertical")) > 0.0).astype(np.float64) + return ((diff(a, which="horizontal") + diff(a, which="vertical")) > 0.0).astype( + np.float64 + ) out = np.zeros_like(a) for row_idx in range(1, a.shape[0]): for col_idx in range(1, a.shape[1]): if which == "horizontal": - if a[row_idx, col_idx-1] != a[row_idx, col_idx]: - if a[row_idx, col_idx-1] == 0: + if a[row_idx, col_idx - 1] != a[row_idx, col_idx]: + if a[row_idx, col_idx - 1] == 0: out[row_idx, col_idx] = 1.0 else: - out[row_idx, col_idx-1] = 1.0 + out[row_idx, col_idx - 1] = 1.0 elif which == "vertical": - if a[row_idx-1, col_idx] != a[row_idx, col_idx]: - if a[row_idx-1, col_idx] == 0: + if a[row_idx - 1, col_idx] != a[row_idx, col_idx]: + if a[row_idx - 1, col_idx] == 0: out[row_idx, col_idx] = 1.0 else: - out[row_idx-1, col_idx] = 1.0 + out[row_idx - 1, col_idx] = 1.0 return out @@ -168,36 +179,37 @@ def skycoord_to_mask(skycoord: astropy.coordinates.SkyCoord, smap: sunpy.map.Map if x.shape[0] == 0 or y.shape[0] == 0: return output x, y = complete_outline(x, y) - x[x>=output.shape[0]] = output.shape[0]-1 - y[y>=output.shape[1]] = output.shape[1]-1 + x[x >= output.shape[0]] = output.shape[0] - 1 + y[y >= output.shape[1]] = output.shape[1] - 1 output[x, y] = 1.0 output = infill_outline(output) return output def skycoords_to_mask( - skycoords: Union[tuple[SkyCoord], list[SkyCoord]], - smap: sunpy.map.Map + skycoords: Union[tuple[SkyCoord], list[SkyCoord]], smap: sunpy.map.Map ) -> np.ndarray: return reduce( lambda t, x: skycoord_to_mask(x, smap) + t, dfp.rest(skycoords), - skycoord_to_mask(dfp.first(skycoords), smap)) + skycoord_to_mask(dfp.first(skycoords), smap), + ) def feature_df_to_mask(feature_df, smap): pipeline = dfp.compose( lambda df: feature_df_to_skycoords(df, smap), - lambda sk: skycoords_to_mask(sk, smap)) - return pipeline(feature_df) + lambda sk: skycoords_to_mask(sk, smap), + ) + return pipeline(feature_df) def build_chain(outline: np.ndarray) -> dict[str, Any]: # generate a freeman 8 component chain code def get_neighbours(x, y): connected_neighbours: list[tuple[int, int]] = [] - for xi in range(max(x-1, 0), min(x+2, outline.shape[0])): - for yi in range(max(y-1, 0), min(y+2, outline.shape[1])): + for xi in range(max(x - 1, 0), min(x + 2, outline.shape[0])): + for yi in range(max(y - 1, 0), min(y + 2, outline.shape[1])): if (xi != x or yi != y) and outline[xi, yi] == 1.0: connected_neighbours.append((xi, yi)) return connected_neighbours @@ -210,20 +222,30 @@ def build_chain(outline: np.ndarray) -> dict[str, Any]: graph.add_edge(xy, n) code = [] - chain = reduce(lambda t, x: x if len(x) > len(t) else t, - networkx.chain_decomposition(graph), - next(iter(networkx.chain_decomposition(graph)))) + chain = reduce( + lambda t, x: x if len(x) > len(t) else t, + networkx.chain_decomposition(graph), + next(iter(networkx.chain_decomposition(graph))), + ) for link in chain: last_x, last_y = link[0] xi, yi = link[1] - if xi-last_x == 1 and yi-last_y == 0: code.append("6") - if xi-last_x == 1 and yi-last_y == 1: code.append("5") - if xi-last_x == 0 and yi-last_y == 1: code.append("4") - if xi-last_x == -1 and yi-last_y == 1: code.append("3") - if xi-last_x == -1 and yi-last_y == 0: code.append("2") - if xi-last_x == -1 and yi-last_y == -1: code.append("1") - if xi-last_x == 0 and yi-last_y == -1: code.append("0") - if xi-last_x == 1 and yi-last_y == -1: code.append("7") + if xi - last_x == 1 and yi - last_y == 0: + code.append("6") + if xi - last_x == 1 and yi - last_y == 1: + code.append("5") + if xi - last_x == 0 and yi - last_y == 1: + code.append("4") + if xi - last_x == -1 and yi - last_y == 1: + code.append("3") + if xi - last_x == -1 and yi - last_y == 0: + code.append("2") + if xi - last_x == -1 and yi - last_y == -1: + code.append("1") + if xi - last_x == 0 and yi - last_y == -1: + code.append("0") + if xi - last_x == 1 and yi - last_y == -1: + code.append("7") return { "cc_x_pix": chain[0][1][0], @@ -234,14 +256,14 @@ def build_chain(outline: np.ndarray) -> dict[str, Any]: def pixel_to_arcsec(x, y, smap): - s = smap.pixel_to_world(x*u.pixel, y*u.pixel) + s = smap.pixel_to_world(x * u.pixel, y * u.pixel) return s.Tx.value, s.Ty.value def mask_to_chaincode(mask: np.ndarray, smap: sunpy.map.Map) -> pd.DataFrame: chaincodes: list[dict[str, Any]] = [] labelled_array, n_features = scipy.ndimage.label(mask) - for feature_idx in range(1, n_features+1): + for feature_idx in range(1, n_features + 1): _tmp = labelled_array == feature_idx outline = diff((_tmp == 1).astype(np.int8), "both") code = build_chain(outline) @@ -258,4 +280,3 @@ def mask_to_chaincode(mask: np.ndarray, smap: sunpy.map.Map) -> pd.DataFrame: df.loc[row_idx, "cc_y_arcsec"] = y df["date_obs"] = smap.date.strftime("%Y-%m-%dT%H:%M:%S") # isoformat return df -