diff --git a/record-and-playback/core/scripts/utils/gen_webvtt b/record-and-playback/core/scripts/utils/gen_webvtt index 0221a4ccb5b306b73f0abf35a02591d383d02c7e..d9ab086ffa184c21e180bdef2050dcab70a8dc17 100755 --- a/record-and-playback/core/scripts/utils/gen_webvtt +++ b/record-and-playback/core/scripts/utils/gen_webvtt @@ -22,7 +22,7 @@ from lxml import etree from collections import deque from fractions import Fraction import io -from icu import Locale, BreakIterator +from icu import Locale, BreakIterator, UnicodeString import unicodedata import html import logging @@ -35,12 +35,14 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) + def webvtt_timestamp(ms): frac_s = int(ms % 1000) s = int(ms / 1000 % 60) m = int(ms / 1000 / 60 % 60) h = int(ms / 1000 / 60 / 60) - return '{:02}:{:02}:{:02}.{:03}'.format(h, m, s, frac_s) + return "{:02}:{:02}:{:02}.{:03}".format(h, m, s, frac_s) + class CaptionLine: def __init__(self): @@ -48,10 +50,11 @@ class CaptionLine: self.start_time = 0 self.end_time = 0 + class Caption: def __init__(self, locale): self.locale = locale - self.text = list() + self.text = UnicodeString() self.timestamps = list() self._del_timestamps = list() @@ -63,24 +66,30 @@ class Caption: else: del_timestamp = self.timestamps[i] self._del_timestamps[i] = del_timestamp - logger.debug("Removing text %s at %d:%d, del_ts: %d", - repr(''.join(self.text[i:j])), i, j, del_timestamp) + logger.debug( + "Removing text %s at %d:%d, del_ts: %d", + repr(str(self.text[i:j])), + i, + j, + del_timestamp, + ) if len(text) > 0: - logger.debug("Inserting text %s at %d:%d, ts: %d", - repr(''.join(text)), i, j, timestamp) + logger.debug( + "Inserting text %s at %d:%d, ts: %d", repr(str(text)), i, j, timestamp + ) if i < len(self.timestamps) and timestamp > self.timestamps[i]: timestamp = self._del_timestamps[i] if timestamp is None: if i > 0: - timestamp = self.timestamps[i-1] + timestamp = self.timestamps[i - 1] else: timestamp = self.timestamps[i] logger.debug("Out of order timestamps, using ts: %d", timestamp) self._del_timestamps[i:j] = [del_timestamp] * len(text) - if (i < len(self._del_timestamps)): + if i < len(self._del_timestamps): self._del_timestamps[i] = del_timestamp self.text[i:j] = text @@ -94,9 +103,9 @@ class Caption: stop_pos = 0 start_pos = None for event in events: - if event['name'] == 'record_status': - status = event['status'] - timestamp = event['timestamp'] + if event["name"] == "record_status": + status = event["status"] + timestamp = event["timestamp"] if status and not record: record = True @@ -106,13 +115,14 @@ class Caption: # Find the position of the first character after recording # started start_pos = stop_pos - while start_pos < len(self.timestamps) and \ - self.timestamps[start_pos] < start_ts: + while ( + start_pos < len(self.timestamps) + and self.timestamps[start_pos] < start_ts + ): start_pos += 1 - logger.debug("Replacing characters %d:%d", - stop_pos, start_pos) - self.text[stop_pos:start_pos] = ["\n"] + logger.debug("Replacing characters %d:%d", stop_pos, start_pos) + self.text[stop_pos:start_pos] = "\n" self.timestamps[stop_pos:start_pos] = [stop_ts - ts_offset] start_pos = stop_pos + 1 @@ -130,8 +140,10 @@ class Caption: # Find the position of the first character after recording # stopped, and apply ts offsets stop_pos = start_pos - while stop_pos < len(self.timestamps) and \ - self.timestamps[stop_pos] < stop_ts: + while ( + stop_pos < len(self.timestamps) + and self.timestamps[stop_pos] < stop_ts + ): self.timestamps[stop_pos] -= ts_offset stop_pos += 1 @@ -149,17 +161,16 @@ class Caption: # Apply all of the caption events to generate the full text # with per-character timestamps for event in events: - if event['name'] == 'edit_caption_history': - locale = event['locale'] - i = event['start_index'] - j = event['end_index'] - timestamp = event['timestamp'] - text = event['text'] + if event["name"] == "edit_caption_history": + locale = event["locale"] + i = event["start_index"] + j = event["end_index"] + timestamp = event["timestamp"] + text = UnicodeString(event["text"]) caption = captions.get(locale) if caption is None: - logger.info("Started caption stream for locale '%s'", - locale) + logger.info("Started caption stream for locale '%s'", locale) captions[locale] = caption = cls(locale) caption.apply_edit(i, j, timestamp, text) @@ -175,15 +186,12 @@ class Caption: def split_lines(self, max_length=32): lines = list() - str_text = "".join(self.text) - locale = Locale(self.locale) - logger.debug("Using locale %s for word-wrapping", - locale.getDisplayName(locale)) + logger.debug("Using locale %s for word-wrapping", locale.getDisplayName(locale)) break_iter = BreakIterator.createLineInstance(locale) - break_iter.setText(str_text) - + break_iter.setText(self.text) + line = CaptionLine() line_start = 0 prev_break = 0 @@ -194,39 +202,45 @@ class Caption: status = break_iter.getRuleStatus() line_end = next_break - while line_end > line_start and ( \ - self.text[line_end-1].isspace() or \ - unicodedata.category(self.text[line_end-1]) in ['Cc', 'Mn'] - ): + logger.debug("text len: %d, line end: %d", len(self.text), line_end) + while line_end > line_start and ( + self.text[line_end - 1].isspace() + or unicodedata.category(self.text[line_end - 1]) in ["Cc", "Mn"] + ): line_end -= 1 do_break = False text_section = unicodedata.normalize( - 'NFC', "".join(self.text[line_start:line_end])) + "NFC", str(self.text[line_start:line_end]) + ) timestamps_section = self.timestamps[line_start:next_break] start_time = min(timestamps_section) end_time = max(timestamps_section) if len(text_section) > max_length: if prev_break == line_start: # Over-long string. Just chop it into bits - line_end = next_break = prev_break + max_length + next_break = prev_break + max_length + continue else: next_break = prev_break do_break = True else: # Status [100,200) indicates a required (hard) line break - if next_break >= len(self.text) or \ - (status >= 100 and status < 200): + if next_break >= len(self.text) or (status >= 100 and status < 200): line.text = text_section line.start_time = start_time line.end_time = end_time do_break = True if do_break: - logger.debug("text section %d -> %d (%d): %s", - line.start_time, line.end_time, - len(line.text), repr(line.text)) + logger.debug( + "text section %d -> %d (%d): %s", + line.start_time, + line.end_time, + len(line.text), + repr(line.text), + ) lines.append(line) line = CaptionLine() line_start = next_break @@ -242,7 +256,7 @@ class Caption: def write_webvtt(self, f): # Write magic - f.write("WEBVTT\n\n".encode('utf-8')) + f.write("WEBVTT\n\n".encode("utf-8")) lines = self.split_lines() @@ -297,49 +311,48 @@ class Caption: if next_start_time - end_time < 500: end_time = next_start_time - f.write("{} --> {}\n".format( - webvtt_timestamp(start_time), - webvtt_timestamp(end_time) - ).encode('utf-8')) - f.write(html.escape(text, quote=False).encode('utf-8')) - f.write("\n\n".encode('utf-8')) + f.write( + "{} --> {}\n".format( + webvtt_timestamp(start_time), webvtt_timestamp(end_time) + ).encode("utf-8") + ) + f.write(html.escape(text, quote=False).encode("utf-8")) + f.write("\n\n".encode("utf-8")) def caption_desc(self): locale = Locale(self.locale) - return { - "locale": self.locale, - "localeName": locale.getDisplayName(locale) - } + return {"locale": self.locale, "localeName": locale.getDisplayName(locale)} def parse_record_status(event, element): - userId = element.find('userId') - status = element.find('status') + userId = element.find("userId") + status = element.find("status") + + event["name"] = "record_status" + event["user_id"] = userId.text + event["status"] = status.text == "true" - event['name'] = 'record_status' - event['user_id'] = userId.text - event['status'] = (status.text == 'true') def parse_caption_edit(event, element): - locale = element.find('locale') - text = element.find('text') - startIndex = element.find('startIndex') - endIndex = element.find('endIndex') - localeCode = element.find('localeCode') - - event['name'] = 'edit_caption_history' - event['locale_name'] = locale.text + locale = element.find("locale") + text = element.find("text") + startIndex = element.find("startIndex") + endIndex = element.find("endIndex") + localeCode = element.find("localeCode") + + event["name"] = "edit_caption_history" + event["locale_name"] = locale.text if localeCode is not None: - event['locale'] = localeCode.text + event["locale"] = localeCode.text else: # Fallback for missing 'localeCode' - event['locale'] = "en" + event["locale"] = "en" if text.text is None: - event['text'] = list() + event["text"] = "" else: - event['text'] = list(text.text) - event['start_index'] = int(startIndex.text) - event['end_index'] = int(endIndex.text) + event["text"] = text.text + event["start_index"] = int(startIndex.text) + event["end_index"] = int(endIndex.text) def parse_events(directory="."): @@ -353,22 +366,22 @@ def parse_events(directory="."): event = {} # Convert timestamps to be in seconds from recording start - timestamp = int(element.attrib['timestamp']) + timestamp = int(element.attrib["timestamp"]) if not start_time: start_time = timestamp timestamp = timestamp - start_time # Only need events from these modules - if not element.attrib['module'] in ['CAPTION','PARTICIPANT']: + if not element.attrib["module"] in ["CAPTION", "PARTICIPANT"]: continue - event['name'] = name = element.attrib['eventname'] - event['timestamp'] = timestamp + event["name"] = name = element.attrib["eventname"] + event["timestamp"] = timestamp - if name == 'RecordStatusEvent': + if name == "RecordStatusEvent": parse_record_status(event, element) have_record_events = True - elif name == 'EditCaptionHistoryEvent': + elif name == "EditCaptionHistoryEvent": parse_caption_edit(event, element) else: logger.debug("Unhandled event: %s", name) @@ -381,25 +394,31 @@ def parse_events(directory="."): if not have_record_events: # Add a fake record start event to the events list event = { - 'name': 'record_status', - 'user_id': None, - 'timestamp': 0, - 'status': True - } + "name": "record_status", + "user_id": None, + "timestamp": 0, + "status": True, + } events.appendleft(event) return events + if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Generate WebVTT files from BigBlueButton captions", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("-i", "--input", metavar="PATH", - help="input directory with events.xml file", - default=os.curdir) - parser.add_argument("-o", "--output", metavar="PATH", - help="output directory", - default=os.curdir) + description="Generate WebVTT files from BigBlueButton captions", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "-i", + "--input", + metavar="PATH", + help="input directory with events.xml file", + default=os.curdir, + ) + parser.add_argument( + "-o", "--output", metavar="PATH", help="output directory", default=os.curdir + ) args = parser.parse_args() rawdir = args.input @@ -419,6 +438,6 @@ if __name__ == "__main__": filename = os.path.join(outputdir, "captions.json") logger.info("Writing captions index file to %s", filename) - caption_descs = [ caption.caption_desc() for caption in captions.values() ] + caption_descs = [caption.caption_desc() for caption in captions.values()] with open(filename, "w") as f: json.dump(caption_descs, f)