#!/usr/bin/env python3 # -*- coding: utf-8 -*- # type: ignore # *************************************************************************** # * Copyright (C) 2024, Paul Lutus * # * * # * This program is free software; you can redistribute it and/or modify * # * it under the terms of the GNU General Public License as published by * # * the Free Software Foundation; either version 2 of the License, or * # * (at your option) any later version. * # * * # * This program is distributed in the hope that it will be useful, * # * but WITHOUT ANY WARRANTY; without even the implied warranty of * # * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * # * GNU General Public License for more details. * # * * # * You should have received a copy of the GNU General Public License * # * along with this program; if not, write to the * # * Free Software Foundation, Inc., * # * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * # *************************************************************************** import os, sys, re, operator, datetime, time, subprocess import atexit, serial, platform, zipfile VERSION = 1.7 # NOTE: Versions 1.6 and above are written in Python 3 # set this flag to True for development work DEBUG = False class IcomUtilities: def __init__(self): # User configuration table self.radio_configurations = { "IC-706": ( "IC-706", ("general_hf", "marine_hf", "marine_vhf_wx", "marine_vhf_short", "E"), ), "IC-7000-Boat": ( "IC-7000", ( "general_hf", "marine_hf", "marine_vhf_wx", "general_vhf_uhf", "emergency_beacon", "marine_vhf_long", "family_radio_service", "cb", "E", ), ), "IC-7000-Home": ( "IC-7000", ( "general_hf", "marine_hf", "marine_vhf_wx", "general_vhf_uhf", "emergency_beacon", "marine_vhf_long", "family_radio_service", "cb", "vhf_repeaters_long", "E", ), ), "IC-746": ( "IC-746", ( "general_hf", "marine_hf", "marine_vhf_wx", "marine_vhf_short", "vhf_repeaters_short", "E", ), ), "IC-756Pro": ("IC-756Pro", ("general_hf", "marine_hf", "E")), "IC-R8500": ( "IC-R8500", ( "general_hf", "marine_hf", "marine_vhf_wx", "marine_vhf_long", "general_vhf_uhf", "emergency_beacon", "family_radio_service", "cb", "public_service", "vhf_repeaters_long", "E", ), ), } # change these if needed self.baud_rate = "19200" # Linux: ttyS0, ttyS1 for conventional serial interfaces # or ttyUSB0, ttyUSB1 for USB serial adaptors self.linux_port = "ttyUSB0" # Windows: COM1, COM2, etc self.windows_port = "COM1" # 'use_ods_files' if True uses OpenOffice spreadsheet files # located in 'frequency_data_ods' subdirectory # otherwise uses CSV files # located in 'frequency_data_csv' subdirectory self.use_ods_files = True self.icom_codes = { # Ham Radios: "IC-703": 0x68, "IC-706": 0x4E, "IC-706MKIIG": 0x58, "IC-718": 0x5E, "IC-725": 0x28, "IC-726": 0x30, "IC-728": 0x38, "IC-729": 0x3A, "IC-735": 0x04, "IC-736": 0x40, "IC-746": 0x56, "IC-746Pro": 0x66, "IC-751": 0x1C, "IC-756Pro": 0x5C, "IC-756Pro-II": 0x64, "IC-756Pro-III": 0x6E, "IC-761": 0x1E, "IC-765": 0x2C, "IC-775": 0x46, "IC-781": 0x26, "IC-970": 0x2E, "IC-7000": 0x70, "IC-7200": 0x76, "IC-7600": 0x7A, "IC-7700": 0x74, "IC-7800": 0x6A, # Receivers "IC-R71": 0x1A, "IC-R72": 0x32, "IC-R75": 0x5A, "IC-R7000": 0x08, "IC-R7100": 0x34, "IC-R8500": 0x4A, "IC-R9000": 0x2A, # Marine Radios "M-7000Pro": 0x02, "M-710": 0x01, "M-710RT": 0x03, "M-802": 0x08, "Any": 0x00, # (any Icom marine radio) } self.memory_label_sizes = { "IC-R75": 8, "IC-R8500": 8, "IC-746": 9, "IC-746Pro": 9, "IC-756Pro": 10, "IC-756Pro-II": 10, "IC-756Pro-III": 10, "IC-7000": 9, "IC-7200": 9, "IC-7600": 9, "IC-7700": 9, "IC-7800": 9, } self.bank_sizes = { "IC-R8500": 40, "IC-7000": 99, "IC-7200": 99, "IC-7600": 99, "IC-7700": 99, "IC-7800": 99, } self.modes = { "lsb": 0, "usb": 1, "am": 2, "cw": 3, "rtty": 4, "fm": 5, "wfm": 6, } self.ods_directory = "frequency_data_ods" self.fcsv_directory = "frequency_data_csv" self.ftsv_directory = "frequency_data_tsv" self.fhtml_directory = "frequency_data_html" self.rcsv_directory = "radio_csv_tables" self.rtsv_directory = "radio_tsv_tables" self.html_directory = "radio_html_tables" self.pdf_directory = "radio_pdf_files" for d in ( self.ods_directory, self.html_directory, self.fhtml_directory, self.fcsv_directory, self.ftsv_directory, self.rcsv_directory, self.rtsv_directory, self.pdf_directory, ): if not os.path.exists(d): os.makedirs(d) self.field_names = ( "Bank", "Mem", "Name", "MemTag", "Mode", "RxFreq", "TxFreq", "RxTone", "TxTone", "Comment", # type: ignore "Place", "Call", "Sponsor", "Region", ) self.field_hash = {} for n, name in enumerate(self.field_names): self.field_hash[name] = n self.css_style_block = """ """ self.xml_tab_str = " " def debug_print(self, s, linefeed=True): if DEBUG: sys.stderr.write(s) if linefeed: sys.stderr.write("\n") def read_file(self, path): with open(path) as f: return f.read() def write_file(self, path, data): with open(path, "w") as f: f.write(data) def wrap_tag(self, tag, data, mod=""): if len(mod) > 0: mod = " " + mod return "<%s%s>\n%s\n\n" % (tag, mod, data, tag) def beautify_xhtml(self, data, otab=0): tab = otab xml = [] data = re.sub("\n+", "\n", data) for record in data.split("\n"): record = record.strip() outc = len(re.findall("", record)) inc = len(re.findall(r"<\w", record)) net = inc - outc tab += min(net, 0) xml.append((self.xml_tab_str * tab) + record) tab += max(net, 0) if tab != otab: self.debug_print("Error: tag mismatch: %d\n" % tab) return "\n".join(xml) + "\n" def array_to_xhtml(self, radio_name, array): table = "" row_num = 0 for record in array: row = "" for field in record: field = field.strip() # strip quotes field = re.sub('"', "", field) # replace blank field with   field = re.sub("^$", " ", field) row += self.wrap_tag(("td", "th")[row_num == 0], field) row_num += 1 mod = ('class="cell%d"' % (row_num % 2), "")[row_num == 0] table += self.wrap_tag("tr", row, mod) table = self.wrap_tag( "table", table, 'cellpadding="0" cellspacing="0" border="0"' ) footer = "Created using " footer += self.wrap_tag( "a", "IcomProgrammer", 'href="http://arachnoid.com/IcomProgrammer"' ) footer += " — copyright © 2018, P. Lutus" table += self.wrap_tag("p", footer) dtime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S %Z").strip() title = radio_name + " — " + dtime page = self.wrap_tag("div", table, 'align="center"') page = self.wrap_tag("body", page) head = self.wrap_tag("title", title) page = self.wrap_tag("head", head + self.css_style_block) + page page = self.wrap_tag("html", page) page = self.beautify_xhtml(page) return page def format_floats(self, record): output = [] for field in record: try: v = float(field) field = "%08.4f" % v except: None output.append(field) return output def array_from_csv_file(self, fn): path = os.path.join(self.fcsv_directory, fn + ".csv") data = self.read_file(path) output = [] for line in re.split("\n", data): line = line.strip() if len(line) > 0: line = re.sub('^"(.*?)"$', "\\1", line, 1) record = re.split('","', line) record = self.format_floats(record) output.append(record) return output def encode_write_xsv_file(self, path, array, token, delim): output = "" for record in array: output += delim + token.join(record) + delim + "\n" self.write_file(path, output) def encode_write_pdf_file(self, radio_name, html_data, pdf_path): # try: if True: dtime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S %Z").strip() p = subprocess.Popen( [ "wkhtmltopdf", "--header-left", radio_name, "--header-right", dtime, "-q", "--footer-center", "[page]/[toPage]", "-", "-", ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdoutdata, stderrdata = p.communicate(bytearray(html_data, "utf-8")) self.write_file(pdf_path, str(stdoutdata)) # except Exception as e: # sys.stderr.write('Error: %s (possibly no pdf converter present)\n' % e) def encode_write_xhtml_file(self, radio_name, path, array): page = self.array_to_xhtml(radio_name, array) self.write_file(path, page) return page def add_records_to_array(self, records, array): if len(array) == 0: array.append(self.field_names) self.used_array = [False for i in self.field_names] name_hash = {} header = records.pop(0) for field in self.field_names: if field in header: name_hash[field] = operator.indexOf(header, field) for record in records: output = ["" for s in self.field_names] for name in self.field_names: if name in name_hash: s = record[name_hash[name]].strip() if len(s) > 0: n = self.field_hash[name] output[n] = s self.used_array[n] = True array.append(output) def remove_empty_fields_from_array(self, array): output = [] for record in array: newrec = [] for n, used in enumerate(self.used_array): if used: newrec.append(record[n]) output.append(newrec) return output class OdsToArray: def extract_simple(self, data, tag): return re.findall("(?s)<%s[^/|>]*?>(.*?)" % (tag, tag), str(data)) def extract_complex(self, data, tag): output = [] # must capture open and closed tags, both with repeat specifiers array = re.findall( "(?s)(<%s[^/>]*?/>)|(<%s[^/>]*?>.*?)" % (tag, tag, tag), data ) for tup in array: for datum in tup: n = 1 if re.search("table:number-columns-repeated", datum): # get column-repeat value sn = re.sub( r".*table:number-columns-repeated=\"(\d+)\".*", "\\1", datum ) n = int(sn) if re.search("/>", datum): # repeat empty columns if n > 1: n = min(n, self.record_sz - len(output)) for i in range(n): output.append("") else: # now strip out the residual table tag datum = re.sub("", "", datum) if len(datum) > 0: # repeat data columns for i in range(n): output.append(datum) return output def extract_record(self, row): output = [] n = 0 fields = self.extract_complex(row, "table:table-cell") for field in fields: content = self.extract_simple(field, "text:p") if len(content) > 0: n += 1 output.append(content[0]) else: output.append("") if n > 0: self.record_sz = max(len(output), self.record_sz) return output else: return None def array_from_ods_file(self, fn): path = os.path.join(self.ods_directory, fn + ".ods") zf = zipfile.ZipFile(path) with zf.open("content.xml") as f: data = f.read() zf.close() array = [] self.record_sz = 0 sheets = self.extract_simple(data, "office:spreadsheet") for sheet in sheets: tables = self.extract_simple(sheet, "table:table") for table in tables: rows = self.extract_simple(table, "table:table-row") for row in rows: record = self.extract_record(row) if record and len(record) > 0: record = self.format_floats(record) array.append(record) return array class IcomIO: def __init__(self): self.serial = False self.opsys = platform.system() if re.search("(?i)linux", self.opsys): self.port = "/dev/" + self.linux_port elif re.search("(?i)windows", self.opsys): self.port = self.windows_port else: sys.stderr.write( 'Error: Cannot identify operating system: "%s".\n' % self.opsys ) sys.exit(0) self.set_defaults(0) # Register exit function atexit.register(self.exit) def set_defaults(self, banksize): self.banksize = banksize self.current_vfo = -1 self.mem_bank = -1 self.mem_loc = -1 self.split = False def exit(self): self.close_serial() self.debug_print("IcomIO exit") def close_serial(self): if self.serial: self.serial.flush() self.serial.close() self.serial = False def init_serial(self, force=False): if force or not self.serial: try: self.close_serial() self.serial = serial.Serial( self.port, self.baud_rate, parity=serial.PARITY_NONE, timeout=1000, rtscts=0, ) self.serial.flushOutput() self.serial.flushInput() except: self.serial = False return self.serial != False def read_radio_n(self, n): self.debug_print("Read Radio count %d: " % n, False) count = 0 reply = [] while count < n: s = self.serial.read(1) c = 0 if len(s) > 0: c = s[0] reply.append(c) self.debug_print("%02x " % c, False) count += 1 self.debug_print("") return reply def render_list_as_hex(self, data): s = "[ " for c in data: s += "%02x " % c s += "]" return s def read_radio_s(self): self.debug_print("Read Radio until 0xfd: ", False) count = 0 reply = [] c = 0 while c != 0xFD: s = self.serial.read(1) c = 0 if len(s) > 0: c = s[0] reply.append(c) self.debug_print("%02x " % c, False) count += 1 self.debug_print("") return reply def write_radio(self, com): self.debug_print("Write Radio: ", False) for c in com: self.debug_print("%02x " % c, False) self.serial.write(bytes([c])) self.debug_print("") # discard echo reply self.read_radio_n(len(com)) def read_radio_response(self): reply = self.read_radio_n(6) return reply[4] == 0xFB # meaning no errors def convert_bcd(self, n, count): n = int(n) bcd = [] for i in range(count): bcd.append((n % 10) | ((n // 10) % 10) << 4) n //= 100 return bcd def send_com_core(self, c, data=False): com = [0xFE, 0xFE, self.radio_id, 0xE0, c] if data: com += data com.append(0xFD) self.write_radio(com) return com def send_com(self, c, data=False): com = self.send_com_core(c, data) r = self.read_radio_response() if not r: err = "Error: " + self.render_list_as_hex(com) self.debug_print(err) return r def set_memory_mode(self): self.debug_print("set memory mode") self.send_com(0x08) def set_vfo(self, n): self.debug_print("set vfo: %d" % n) if self.current_vfo != n: self.current_vfo = n self.send_com(0x07) # select VFO mode (required for IC-756) self.send_com(0x07, [0xD0 + n]) # select VFO main/sub (required for IC-756) return self.send_com(0x07, [n]) # select VFO # 01.24.2018 default force = True to avoid # a default setting of split mode def set_split(self, split, force=True): if force or self.split != split: c = (0, 1)[split] self.debug_print("set split mode: %d" % c) r = self.send_com(0x0F, [c]) if r: self.split = split return r def set_memory_bank(self, mb): if mb != self.mem_bank: self.mem_bank = mb bcd = self.convert_bcd(mb, 1) # bcd.reverse() bcd = [0xA0] + bcd self.debug_print( "set memory bank: %d : %s" % (mb, self.render_list_as_hex(bcd)) ) return self.send_com(0x08, bcd) else: return True def set_memory_addr(self, m, banksize): # the R8500 uses zero-based indexing # all other radios use 1-based offset = (1, 0)[self.r8500] if banksize != 0: ma = m % banksize mb = m / banksize r = self.set_memory_bank(mb + offset) if not r: self.debug_print("fail set memory bank: %d" % (mb + offset)) return False bcd = self.convert_bcd(ma + offset, 2) bcd.reverse() else: bcd = self.convert_bcd(m + offset, 2) bcd.reverse() self.debug_print( "set memory address: %d %s" % (m, self.render_list_as_hex(bcd)) ) r = self.send_com(0x08, bcd) # user feedback sys.stdout.write(".") sys.stdout.flush() return r def pad_char(self, s, length, c): while len(s) < length: s += c return s def set_memory_name(self, mem, banksize, mem_tag, radio_tag): self.debug_print("setting memory tag %s" % mem_tag) if radio_tag in self.memory_label_sizes: tag_len = self.memory_label_sizes[radio_tag] else: self.debug_print("Fail mem tag on radio %s" % radio_tag) return # the R8500 uses zero-based indexing # all other radios use 1-based offset = (1, 0)[self.r8500] # special read com for the IC-R8500 read_com = (0, 1)[self.r8500] if banksize != 0: ma = mem % banksize mb = mem / banksize mab = self.convert_bcd(ma + offset, 2) mab.reverse() mbb = self.convert_bcd(mb + offset, 1) mbb.reverse() self.send_com_core(0x1A, [read_com] + mbb + mab) else: mab = self.convert_bcd(mem + offset, 2) mab.reverse() self.send_com_core(0x1A, [read_com] + mab) result = self.read_radio_s() if result: rl = len(result) if rl > tag_len: # limit tag size to max mem_tag = mem_tag[:tag_len] # extend tag length for short tags mem_tag = self.pad_char(mem_tag, tag_len, " ") # convert from chars to numbers mem_tag = [ord(c) for c in mem_tag] delta = tag_len + 1 for n in range(tag_len): result[-(delta - n)] = mem_tag[n] # to transmit the received data block, # must change order of sender and recipient temp = result[2] result[2] = result[3] result[3] = temp # for the IC-R8500, change com if self.r8500: result[5] = 0 self.write_radio(result) r = self.read_radio_response() if not r: err = "Error in set memory name: " + self.render_list_as_hex(result) self.debug_print(err) return r else: self.debug_print( "wrong reply length in set memory name: %d %s" % (rl, self.render_list_as_hex(result)) ) else: self.debug_print("fail from read_radio_s") def set_vfo_freq(self, n): f = (n * 1.0e6) + 0.5 self.debug_print("set VFO freq: %f" % f) n = int(f) bcd = self.convert_bcd(n, 5) self.send_com(0x05, bcd) def set_vfo_tone(self, n, f): f = (f * 10.0) + 0.5 f = int(f) bcd = self.convert_bcd(f, 2) bcd.reverse() bcd = [n] + bcd self.send_com(0x1B, bcd) def set_vfo_mode(self, s): self.debug_print("set VFO mode: %s %d" % (s, self.modes[s])) self.send_com(0x06, [self.modes[s]]) def get_field_by_name(self, record, name): r = record[self.field_hash[name]].strip() if len(r) == 0: r = False else: try: r = float(r) except: None return r def program_radio(self, array, radio_id, banksize, radio_tag, erase_unused=False): if not self.init_serial(): sys.stderr.write("Error: cannot open serial interface, aborting.\n") return self.radio_id = radio_id self.r8500 = radio_tag == "IC-R8500" has_split = self.set_split(False, True) self.debug_print("has_split : %s" % has_split) self.current_vfo = -1 self.set_vfo(0) base = 0 start_time = time.time() total = 0 for mm, record in enumerate(array[1:]): # if(mm > 10): break total += 1 mem = base + mm mode = self.get_field_by_name(record, "Mode") mem_tag = self.get_field_by_name(record, "MemTag") rxf = self.get_field_by_name(record, "RxFreq") txf = self.get_field_by_name(record, "TxFreq") rxt = self.get_field_by_name(record, "RxTone") txt = self.get_field_by_name(record, "TxTone") if rxf and mode: # minimum required self.set_memory_addr(mem, banksize) if has_split: # make sure that a transmit frequency is # specified and nonzero, to prevent # a default split mode if txf and txf > 0: self.set_split(True) self.set_vfo(1) self.set_vfo_freq(txf) self.set_vfo_mode(mode) if txt: # transmit repeater tone self.send_com(0x16, [0x42, 0x1]) # repeater tone on self.set_vfo_tone(0, txt) else: self.set_split(False) if has_split: self.set_vfo(0) self.set_vfo_freq(rxf) self.set_vfo_mode(mode) if rxt: # receiver tone squelch self.send_com(0x16, [0x43, 0x1]) # tone squelch on self.set_vfo_tone(1, rxt) else: self.send_com(0x16, [0x43, 0x0]) # tone squelch off self.debug_print("write VFO to memory") self.send_com(0x09) # write mem if mem_tag: self.set_memory_name(mem, banksize, mem_tag, radio_tag) # break self.set_memory_mode() if erase_unused: r = True # point to first unused location m = base + len(array[1:]) while r: total += 1 r = self.set_memory_addr(m, banksize) if r: self.debug_print("reset memory adress: %d" % m) self.send_com(0x0B) m += 1 self.set_memory_addr(base, banksize) self.close_serial() end_time = time.time() dt = end_time - start_time per_trans_ms = dt * 1000 / total print( "\n\nTotal time: %f seconds for %d transactions, %f milliseconds per transaction\n" % (end_time - start_time, total, per_trans_ms) ) class IcomProgrammer(IcomUtilities, IcomIO, OdsToArray): def __init__(self): IcomUtilities.__init__(self) IcomIO.__init__(self) def process_radio(self, tag, commit=True): s = ("Creating Lists for", "Programming")[commit] print("%s %s ..." % (s, tag)) erase_unused = False master_array = [] if not tag in self.radio_configurations: sys.stderr.write('Error: no configuration for "%s".\n' % tag) else: config = self.radio_configurations[tag] radio_model = config[0] if not radio_model in self.icom_codes: sys.stderr.write( 'Error: no hex code for radio model "%s".\n' % radio_model ) else: hex_code = self.icom_codes[radio_model] if radio_model in self.bank_sizes: bank_size = self.bank_sizes[radio_model] else: bank_size = 0 for path in config[1]: if path == "E": erase_unused = True else: if self.use_ods_files: records = self.array_from_ods_file(path) else: records = self.array_from_csv_file(path) self.add_records_to_array(records, master_array) self.used_array[1] = True self.used_array[0] = bank_size > 0 for i, record in enumerate(master_array[1:]): if bank_size > 0: bank = i / bank_size n = i % bank_size record[0] = "%02d" % bank record[1] = "%02d" % n else: record[1] = "%02d" % (i + 1) if commit: self.program_radio( master_array, hex_code, bank_size, radio_model, erase_unused ) master_array = self.remove_empty_fields_from_array(master_array) self.encode_write_xsv_file( os.path.join(self.rcsv_directory, tag + ".csv"), master_array, '","', '"', ) self.encode_write_xsv_file( os.path.join(self.rtsv_directory, tag + ".tsv"), master_array, "\t", "", ) html_path = os.path.join(self.html_directory, tag + ".html") html_page = self.encode_write_xhtml_file(tag, html_path, master_array) self.encode_write_pdf_file( tag, html_page, os.path.join(self.pdf_directory, tag + ".pdf") ) def create_radio_lists(self): for key in sorted(self.radio_configurations.keys()): self.process_radio(key, False) def create_xsv_files(self): files = os.listdir(self.ods_directory) for fn in files: if re.match("(?i).*\\.ods$", fn): fn = re.sub(r"(?i)(.*)\.ods$", "\\1", fn) records = self.array_from_ods_file(fn) self.encode_write_xsv_file( os.path.join(self.fcsv_directory, fn + ".csv"), records, '","', '"' ) self.encode_write_xsv_file( os.path.join(self.ftsv_directory, fn + ".tsv"), records, "\t", "" ) html_path = os.path.join(self.fhtml_directory, fn + ".html") self.encode_write_xhtml_file(fn, html_path, records) def process(self): menu = [] radio_list = [] for key in sorted(self.radio_configurations.keys()): radio_list.append(key) menu.append("Program " + key) menu.append("Create All Radio Frequency Lists") menu.append("Create All CSV/TSV Frequency Lists") menu.append("Quit") while True: print("Options:") for n, item in enumerate(menu): print(" %d: %s" % (n + 1, item)) lm = len(menu) reply = input("Select (1 - %d): " % lm) # reply = sys.stdin.readline() tag = False try: x = int(reply) except: x = -1 if x < 1 or x > lm: print("Pleae enter a valid number.") else: if x == lm: sys.exit(0) elif re.search("Radio Frequency", menu[x - 1]): self.create_radio_lists() elif re.search("CSV/TSV", menu[x - 1]): self.create_xsv_files() else: tag = radio_list[x - 1] if tag: self.process_radio(tag) if __name__ == "__main__": ip = IcomProgrammer().process()