/******************************************************************************* * Copyright (c) 2023 MINRES Technologies GmbH * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * IT Just working - initial API and implementation *******************************************************************************/ package com.minres.scviewer.database.ftr; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.commons.compress.compressors.lz4.BlockLZ4CompressorInputStream; import org.eclipse.collections.impl.map.mutable.UnifiedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.HashMultimap; import com.minres.scviewer.database.AssociationType; import com.minres.scviewer.database.DataType; import com.minres.scviewer.database.EventKind; import com.minres.scviewer.database.IWaveform; import com.minres.scviewer.database.IWaveformDb; import com.minres.scviewer.database.IWaveformDbLoader; import com.minres.scviewer.database.InputFormatException; import com.minres.scviewer.database.RelationType; import com.minres.scviewer.database.RelationTypeFactory; import com.minres.scviewer.database.tx.ITx; import com.minres.scviewer.database.tx.ITxAttribute; import jacob.CborDecoder; import jacob.CborType; /** * The Class TextDbLoader. */ public class FtrDbLoader implements IWaveformDbLoader { static final private CborType break_type = CborType.valueOf(0xff); /** The max time. */ private long maxTime = 0L; ArrayList strDict = new ArrayList<>(); FileInputStream fis = null; FileLock lock = null; /** The attr values. */ final List attrValues = new ArrayList<>(); /** The relation types. */ final Map relationTypes = UnifiedMap.newMap(); /** The tx streams. */ final Map txStreams = UnifiedMap.newMap(); /** The tx generators. */ final Map txGenerators = UnifiedMap.newMap(); /** The transactions. */ final Map transactions = UnifiedMap.newMap(); /** The attribute types. */ final Map attributeTypes = UnifiedMap.newMap(); /** The relations in. */ final HashMultimap relationsIn = HashMultimap.create(); /** The relations out. */ final HashMultimap relationsOut = HashMultimap.create(); /** The tx cache. */ final Map txCache = UnifiedMap.newMap(); /** The threads. */ List threads = new ArrayList<>(); File file; private static final Logger LOG = LoggerFactory.getLogger(FtrDbLoader.class); /** The pcs. */ protected PropertyChangeSupport pcs = new PropertyChangeSupport(this); long time_scale_factor = 1000l; /** * Adds the property change listener. * * @param l the l */ @Override public void addPropertyChangeListener(PropertyChangeListener l) { pcs.addPropertyChangeListener(l); } /** * Removes the property change listener. * * @param l the l */ @Override public void removePropertyChangeListener(PropertyChangeListener l) { pcs.removePropertyChangeListener(l); } /** * Gets the max time. * * @return the max time */ @Override public long getMaxTime() { return maxTime; } /** * Gets the transaction. * * @param txId the tx id * @return the transaction or null if the transaction is not available */ public synchronized ITx getTransaction(long txId) { if (txCache.containsKey(txId)) return txCache.get(txId); if(transactions.containsKey(txId)) { Tx tx = new Tx(this, transactions.get(txId)); txCache.put(txId, tx); return tx; } else { return null; } } public FtrTx getScvTx(long id) { if(transactions.containsKey(id)) return transactions.get(id); else throw new IllegalArgumentException(); } /** * Gets the all waves. * * @return the all waves */ @Override public Collection getAllWaves() { ArrayList ret = new ArrayList<>(txStreams.values()); ret.addAll(txGenerators.values()); return ret; } /** * Gets the all relation types. * * @return the all relation types */ public Collection getAllRelationTypes() { return relationTypes.values(); } /** * Load. * * @param db the db * @param file the file * @return true, if successful * @throws InputFormatException the input format exception */ @Override public void load(File file) throws InputFormatException { dispose(); this.file=file; try { fis = new FileInputStream(file); FileChannel channel = fis.getChannel(); lock=channel.lock(0, Long.MAX_VALUE, true); parseInput(new CborDecoder(fis), channel); } catch (IOException e) { LOG.warn("Problem parsing file "+file.getName()+": " , e); } catch (Exception e) { LOG.error("Error parsing file "+file.getName()+ ": ", e); transactions.clear(); throw new InputFormatException(e.toString()); } } public synchronized List getChunksAtOffsets(ArrayList fileOffsets) throws InputFormatException { List ret = new ArrayList<>(); try { FileChannel fc = fis.getChannel(); for (Long offset : fileOffsets) { if(offset>=0) { fc.position(offset); CborDecoder parser = new CborDecoder(fis); ret.add(parser.readByteString()); } else { fc.position(-offset); CborDecoder parser = new CborDecoder(fis); BlockLZ4CompressorInputStream decomp = new BlockLZ4CompressorInputStream(new ByteArrayInputStream(parser.readByteString())); ret.add(decomp.readAllBytes()); decomp.close(); } } } catch (Exception e) { LOG.error("Error parsing file "+file.getName(), e); transactions.clear(); throw new InputFormatException(e.toString()); } return ret; } void parseTx(TxStream txStream, long blockId, byte[] chunk) throws IOException { CborDecoder cborDecoder = new CborDecoder(new ByteArrayInputStream(chunk)); long size = cborDecoder.readArrayLength(); assert(size==-1); CborType next = cborDecoder.peekType(); while(next != null && !break_type.isEqualType(next)) { long blockOffset = cborDecoder.getPos(); long tx_size = cborDecoder.readArrayLength(); long txId = 0; long genId = 0; for(long i = 0; i scvTx.endTime ? maxTime : scvTx.endTime; transactions.put(txId, scvTx); } public List parseAtrributes(byte[] chunk, long blockOffset) { List ret = new ArrayList<>(); ByteArrayInputStream bais = new ByteArrayInputStream(chunk); bais.skip(blockOffset); CborDecoder cborDecoder = new CborDecoder(bais); try { long tx_size = cborDecoder.readArrayLength(); for(long i = 0; imaxTime?end_time:maxTime; txStreams.get(stream_id).fileOffsets.add(channel.position()); cborDecoder.readByteString(); break; } case 13: { //tx chunk compressed long len = cborDecoder.readArrayLength(); assert(len==5); long stream_id = cborDecoder.readInt(); cborDecoder.readInt(); // start time of block long end_time = cborDecoder.readInt()*time_scale_factor; cborDecoder.readInt(); // uncompressed size maxTime = end_time>maxTime?end_time:maxTime; txStreams.get(stream_id).fileOffsets.add(0-channel.position()); cborDecoder.readByteString(); break; } case 14: { // relations uncompressed parseRel(new CborDecoder(new ByteArrayInputStream(cborDecoder.readByteString()))); break; } case 15: { // relations uncompressed long sz = cborDecoder.readArrayLength(); assert(sz==2); cborDecoder.readInt(); // uncompressed size parseRel(new CborDecoder(new BlockLZ4CompressorInputStream(new ByteArrayInputStream(cborDecoder.readByteString())))); break; } } next = cborDecoder.peekType(); } } catch(IOException e) { long pos = 0; try {pos=channel.position(); } catch (Exception ee) {} LOG.error("Error parsing file input stream at position" + pos, e); } } private void parseDict(CborDecoder cborDecoder) throws IOException { long size = cborDecoder.readMapLength(); ArrayList lst = new ArrayList<>((int)size); for(long i = 0; i3?cborDecoder.readInt():-1; long to_fiber = sz>3?cborDecoder.readInt():-1; String rel_name = strDict.get((int)type_id); FtrRelation ftrRel = new FtrRelation(relationTypes.getOrDefault(rel_name, RelationTypeFactory.create(rel_name)), from_id, to_id, from_fiber, to_fiber); relationsOut.put(from_id, ftrRel); relationsIn.put(to_id, ftrRel); next = cborDecoder.peekType(); } } private void add(Long id, TxStream stream) { txStreams.put(id, stream); pcs.firePropertyChange(IWaveformDbLoader.STREAM_ADDED, null, stream); } private void add(Long id, TxGenerator generator) { txGenerators.put(id, generator); pcs.firePropertyChange(IWaveformDbLoader.GENERATOR_ADDED, null, generator); } }