catm-python-lib
Loading...
Searching...
No Matches
dataforming.py
Go to the documentation of this file.
1"""!
2@file dataforming.py
3@version 1
4@author Fumitaka ENDO
5@date 2025-01-28T13:59:12+09:00
6@brief utilities load input data
7"""
8import numpy as np
9import tomllib
10import os
11import argparse
12import pprint
13
14def str_to_array(input_str):
15 """!
16 @brief convert string to float
17 @param input_str input list written by string
18 @return numpy.array([float, float, ... , float])
19 """
20 try:
21 str_list = str(input_str).split()
22 return np.array([float(value) for value in str_list])
23 except ValueError:
24 print(f"Skipping invalid input: {input_str}")
25 return np.array([])
26
27def load_numbers(file_path):
28 """!
29 @brief load number data
30 @param file_path input data path
31 @return numpy.array([int, int, ... , int])
32 """
33 with open(file_path, 'r') as f:
34 numbers = [int(line.strip()) for line in f if line.strip().isdigit()]
35 return numbers
36
37def read_toml_file(file_path=None):
38 """!
39 @brief read toml file and return result
40 @param file_path input file path
41 @return return tomllib.load(f)
42 """
43 if os.path.isfile(file_path):
44 try:
45 with open(file_path, "rb") as f:
46 config = tomllib.load(f)
47 return config
48 except (tomllib.TOMLDecodeError, OSError) as e:
49 print(f"An error occurred while loading: {e}")
50 return None
51 else:
52 print(f"File does not exist: {file_path}")
53 return None
54
55def read_spe_file(file_path):
56 """!
57 @brief read spe file (MCA data)
58 @param file_path input file path
59 @return x and y ([int, int, ... , int], [int, int, ... , int])
60 """
61 x = []
62 y = []
63
64 with open(file_path, 'r') as file:
65 lines = file.readlines()
66
67 data_section = False
68 for i, line in enumerate(lines):
69 if line.strip() == "$DATA:":
70 data_section = True
71 channel_count = int(lines[i + 1].split()[1])
72 start_idx = i + 2
73 break
74
75 if data_section:
76 for i in range(start_idx, start_idx + channel_count):
77 if lines[i].strip() == "$ENER_FIT:":
78 break
79 y.append(int(lines[i].strip()))
80 x.append(len(x))
81
82 return x, y
83
84def create_histogram_data_from_points(x, y):
85 """!
86 @brief generate frequency distribution from data point lists
87 @param x data points along X axis
88 @param y data points along Y axis
89 @return frequency distribution
90 """
91 frequency_distribution = []
92
93 for x_val, freq in zip(x, y):
94 frequency_distribution.extend([x_val] * freq)
95
96 return frequency_distribution
97
98def find_peaks(data):
99 """!
100 @brief find peak and reterun number of peak and index
101 @param data
102 @return return peak number and list of index (int, [int, int, ..., int])
103 """
104 if len(data) < 3:
105 return 0, []
106
107 peaks = 0
108 peak_indices = []
109
110 for i in range(1, len(data) - 1):
111 if data[i] > data[i - 1] and data[i] > data[i + 1]:
112 peaks += 1
113 peak_indices.append(i)
114
115 return peaks, peak_indices
116
117
118def check_raed_file_function():
119 """!
120 @brief check file raeder
121
122 CLI argument:
123 @arg type select file type
124 @arg path input file path
125 @arg path input file path
126 @arg spe-draw dump flag for plot histogram in terminal
127 @arg draw-height maximum height of drawn histogram
128 """
129 parser = argparse.ArgumentParser()
130 parser.add_argument("type", help="select file type", type=str)
131 parser.add_argument("path", help="path input file path", type=str)
132 parser.add_argument("-sd","--spe-draw", help="dump flag for plot histogram in terminal", action="store_true")
133 parser.add_argument("-dh","--draw-height", help="maximum height of drawn histogram", type=int, default=10)
134
135 args = parser.parse_args()
136 file_type: str = args.type
137 file_path: str = args.path
138 spe_draw_flag: bool = args.spe_draw
139 draw_height: int = args.draw_height
140
141 if file_type == "spe":
142 data_x, data_y = read_spe_file(file_path)
143 bin_width = data_x[1] - data_x[0]
144 data_length = len(data_x)
145 max_value = max(data_y)
146 max_index = data_y.index(max_value)
147 print(f"x range : [{data_x[0]} : {data_x[-1]} ], bin number : {data_length}, bin width :{bin_width}")
148 print(f"maximum value : {max_value}, index : {max_index}")
149
150 if spe_draw_flag:
151 import shutil
152 import math
153
154 def spe_rebin(data, bins):
155 n = len(data)
156 if bins >= n:
157 return data + [0] * (bins - n)
158 bin_size = n / bins
159 rebinned = []
160 for i in range(bins):
161 start = int(i * bin_size)
162 end = int((i + 1) * bin_size)
163 if end <= start:
164 end = start + 1
165 chunk = data[start:end]
166 avg = sum(chunk) / len(chunk)
167 rebinned.append(avg)
168 return rebinned
169
170 def spe_normalize(data, height=10):
171 max_val = max(data)
172 if max_val == 0:
173 return [0] * len(data)
174 return [round((v / max_val) * height) for v in data]
175
176 def spe_draw_histogram(data, height=10):
177 for level in range(height, 0, -1):
178 line = ''
179 for v in data:
180 line += '.' if v >= level else ' '
181 print(line)
182
183 term_width = shutil.get_terminal_size().columns
184 max_bins = term_width
185
186 rebinned = spe_rebin(data_y, max_bins)
187 normalized = spe_normalize(rebinned, height=draw_height)
188 spe_draw_histogram(normalized, height=draw_height)
189
190 elif file_type == "toml":
191 toml_output = read_toml_file(file_path)
192 print("=== TOML Config Dump ===")
193 pprint.pprint(toml_output, sort_dicts=False)