pure memory based implementation of text db reader
This commit is contained in:
parent
0d074ea6ae
commit
23ad84ef30
|
@ -25,13 +25,7 @@ import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
import org.mapdb.BTreeMap;
|
import com.google.common.collect.HashMultimap;
|
||||||
import org.mapdb.DB;
|
|
||||||
import org.mapdb.DB.TreeMapSink;
|
|
||||||
import org.mapdb.DBMaker;
|
|
||||||
import org.mapdb.Serializer;
|
|
||||||
import org.mapdb.serializer.SerializerArrayTuple;
|
|
||||||
|
|
||||||
import com.minres.scviewer.database.AssociationType;
|
import com.minres.scviewer.database.AssociationType;
|
||||||
import com.minres.scviewer.database.DataType;
|
import com.minres.scviewer.database.DataType;
|
||||||
import com.minres.scviewer.database.EventKind;
|
import com.minres.scviewer.database.EventKind;
|
||||||
|
@ -47,23 +41,19 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
|
|
||||||
private Long maxTime=0L;
|
private Long maxTime=0L;
|
||||||
|
|
||||||
Map<String, RelationType> relationTypes=new HashMap<String, RelationType>();
|
Map<String, RelationType> relationTypes = null;
|
||||||
|
|
||||||
DB mapDb;
|
Map<Long, TxStream> txStreams = null;
|
||||||
|
|
||||||
Map<Long, TxStream> txStreams;
|
Map<Long, TxGenerator> txGenerators = null;
|
||||||
|
|
||||||
Map<Long, TxGenerator> txGenerators;
|
Map<Long, ScvTx> transactions = null;
|
||||||
|
|
||||||
BTreeMap<Long, ScvTx> transactions;
|
Map<String, TxAttributeType> attributeTypes = null;
|
||||||
|
|
||||||
BTreeMap<Long[], ScvRelation> relationsIn;
|
HashMultimap<Long, ScvRelation> relationsIn = null;
|
||||||
|
|
||||||
BTreeMap<Long[], ScvRelation> relationsOut;
|
HashMultimap<Long, ScvRelation> relationsOut = null;
|
||||||
|
|
||||||
BTreeMap<String[], TxAttributeType> attributeTypes;
|
|
||||||
|
|
||||||
HashMap<Long, Tx> txCache = new HashMap<>();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long getMaxTime() {
|
public Long getMaxTime() {
|
||||||
|
@ -89,36 +79,14 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
throw new InputFormatException();
|
throw new InputFormatException();
|
||||||
}
|
}
|
||||||
File mapDbFile;
|
relationTypes=new HashMap<String, RelationType>();
|
||||||
|
txStreams = new HashMap<>();
|
||||||
|
txGenerators = new HashMap<>();
|
||||||
|
transactions = new HashMap<>();
|
||||||
|
attributeTypes = new HashMap<>();
|
||||||
|
relationsIn = HashMultimap.create();
|
||||||
|
relationsOut = HashMultimap.create();
|
||||||
try {
|
try {
|
||||||
mapDbFile = File.createTempFile("."+file.getName(), null /*"tmp"*/, null /*file.parentFile*/);
|
|
||||||
} catch (IOException e1) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
mapDbFile.delete(); // we just need a file name
|
|
||||||
mapDbFile.deleteOnExit();
|
|
||||||
mapDb = DBMaker
|
|
||||||
.memoryDirectDB()
|
|
||||||
// .fileDB(mapDbFile)
|
|
||||||
// .fileMmapEnable() // Always enable mmap
|
|
||||||
// .fileMmapEnableIfSupported()
|
|
||||||
// .fileMmapPreclearDisable()
|
|
||||||
.allocateStartSize(512*1024*1024)
|
|
||||||
.allocateIncrement(128*1024*1024)
|
|
||||||
.cleanerHackEnable()
|
|
||||||
.make();
|
|
||||||
TreeMapSink<Long, ScvTx> txSink = mapDb.treeMap("transactions", Serializer.LONG,Serializer.JAVA).createFromSink();
|
|
||||||
relationsIn = mapDb
|
|
||||||
.treeMap("relationsIn", new SerializerArrayTuple(Serializer.LONG, Serializer.LONG) ,Serializer.JAVA)
|
|
||||||
.createOrOpen();
|
|
||||||
relationsOut = mapDb
|
|
||||||
.treeMap("relationsOut", new SerializerArrayTuple(Serializer.LONG, Serializer.LONG) ,Serializer.JAVA)
|
|
||||||
.createOrOpen();
|
|
||||||
attributeTypes = mapDb
|
|
||||||
.treeMap("attributeTypes", new SerializerArrayTuple(Serializer.STRING, Serializer.STRING) ,Serializer.JAVA)
|
|
||||||
.createOrOpen();
|
|
||||||
try {
|
|
||||||
parser.setTransactionSink(txSink).setRelationMaps(relationsIn, relationsOut);
|
|
||||||
parser.parseInput(gzipped?new GZIPInputStream(new FileInputStream(file)):new FileInputStream(file));
|
parser.parseInput(gzipped?new GZIPInputStream(new FileInputStream(file)):new FileInputStream(file));
|
||||||
} catch(IllegalArgumentException|ArrayIndexOutOfBoundsException e) {
|
} catch(IllegalArgumentException|ArrayIndexOutOfBoundsException e) {
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
|
@ -126,10 +94,6 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
transactions=txSink.create();
|
|
||||||
txStreams=new HashMap<>(parser.streamsById);
|
|
||||||
txGenerators=new HashMap<>(parser.generatorsById);
|
|
||||||
txCache.clear();
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,40 +132,17 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
static final Pattern begin_attribute = Pattern.compile("^begin_attribute \\(ID (\\d+), name \"([^\"]+)\", type \"([^\"]+)\"\\)$");
|
static final Pattern begin_attribute = Pattern.compile("^begin_attribute \\(ID (\\d+), name \"([^\"]+)\", type \"([^\"]+)\"\\)$");
|
||||||
static final Pattern end_attribute = Pattern.compile("^end_attribute \\(ID (\\d+), name \"([^\"]+)\", type \"([^\"]+)\"\\)$");
|
static final Pattern end_attribute = Pattern.compile("^end_attribute \\(ID (\\d+), name \"([^\"]+)\", type \"([^\"]+)\"\\)$");
|
||||||
|
|
||||||
HashMap<Long, TxStream> streamsById = new HashMap<>();
|
|
||||||
|
|
||||||
HashMap<Long, TxGenerator> generatorsById = new HashMap<>();
|
|
||||||
|
|
||||||
HashMap<Long, ScvTx> transactionsById = new HashMap<>();
|
|
||||||
|
|
||||||
final TextDbLoader loader;
|
final TextDbLoader loader;
|
||||||
|
|
||||||
BufferedReader reader = null;
|
BufferedReader reader = null;
|
||||||
|
|
||||||
TxGenerator generator=null;
|
TxGenerator generator=null;
|
||||||
|
|
||||||
private TreeMapSink<Long, ScvTx> txSink;
|
|
||||||
|
|
||||||
private BTreeMap<Long[], ScvRelation> relationsIn;
|
|
||||||
|
|
||||||
private BTreeMap<Long[], ScvRelation> relationsOut;
|
|
||||||
|
|
||||||
public TextDbParser(TextDbLoader loader) {
|
public TextDbParser(TextDbLoader loader) {
|
||||||
super();
|
super();
|
||||||
this.loader = loader;
|
this.loader = loader;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TextDbParser setTransactionSink(TreeMapSink<Long, ScvTx> sink) {
|
|
||||||
this.txSink=sink;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TextDbParser setRelationMaps(BTreeMap<Long[], ScvRelation> relationsIn, BTreeMap<Long[], ScvRelation> relationsOut) {
|
|
||||||
this.relationsIn=relationsIn;
|
|
||||||
this.relationsOut=relationsOut;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
void parseInput(InputStream inputStream) throws IOException{
|
void parseInput(InputStream inputStream) throws IOException{
|
||||||
reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
|
reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
|
||||||
String curLine = reader.readLine();
|
String curLine = reader.readLine();
|
||||||
|
@ -214,7 +155,7 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
}
|
}
|
||||||
|
|
||||||
private TxAttributeType getAttrType(String name, DataType dataType, AssociationType type){
|
private TxAttributeType getAttrType(String name, DataType dataType, AssociationType type){
|
||||||
String[] key = new String[] {name, dataType.toString()};
|
String key =name+"-"+dataType.toString();
|
||||||
TxAttributeType res;
|
TxAttributeType res;
|
||||||
if(loader.attributeTypes.containsKey(key)){
|
if(loader.attributeTypes.containsKey(key)){
|
||||||
res=loader.attributeTypes.get(key);
|
res=loader.attributeTypes.get(key);
|
||||||
|
@ -233,35 +174,36 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
DataType type = DataType.valueOf(tokens[3]);
|
DataType type = DataType.valueOf(tokens[3]);
|
||||||
String remaining = tokens.length>5?String.join(" ", Arrays.copyOfRange(tokens, 5, tokens.length)):"";
|
String remaining = tokens.length>5?String.join(" ", Arrays.copyOfRange(tokens, 5, tokens.length)):"";
|
||||||
TxAttributeType attrType = getAttrType(name, type, AssociationType.RECORD);
|
TxAttributeType attrType = getAttrType(name, type, AssociationType.RECORD);
|
||||||
transactionsById.get(id).attributes.add(new TxAttribute(attrType, remaining));
|
loader.transactions.get(id).attributes.add(new TxAttribute(attrType, remaining));
|
||||||
} else if("tx_begin".equals(tokens[0])){
|
} else if("tx_begin".equals(tokens[0])){
|
||||||
Long id = Long.parseLong(tokens[1]);
|
Long id = Long.parseLong(tokens[1]);
|
||||||
Long genId = Long.parseLong(tokens[2]);
|
Long genId = Long.parseLong(tokens[2]);
|
||||||
TxGenerator gen=generatorsById.get(genId);
|
TxGenerator gen=loader.txGenerators.get(genId);
|
||||||
ScvTx tx = new ScvTx(id, gen.stream.getId(), genId, Long.parseLong(tokens[3])*stringToScale(tokens[4]));
|
ScvTx scvTx = new ScvTx(id, gen.stream.getId(), genId, Long.parseLong(tokens[3])*stringToScale(tokens[4]));
|
||||||
transactionsById.put(id, tx);
|
Tx tx = new Tx(loader, scvTx);
|
||||||
loader.maxTime = loader.maxTime>tx.beginTime?loader.maxTime:tx.beginTime;
|
loader.maxTime = loader.maxTime>scvTx.beginTime?loader.maxTime:scvTx.beginTime;
|
||||||
TxStream stream = streamsById.get(gen.stream.getId());
|
TxStream stream = loader.txStreams.get(gen.stream.getId());
|
||||||
stream.setConcurrency(stream.getConcurrency()+1);
|
stream.setConcurrency(stream.getConcurrency()+1);
|
||||||
if(nextLine!=null && nextLine.charAt(0)=='a') {
|
if(nextLine!=null && nextLine.charAt(0)=='a') {
|
||||||
int idx=0;
|
int idx=0;
|
||||||
while(nextLine!=null && nextLine.charAt(0)=='a') {
|
while(nextLine!=null && nextLine.charAt(0)=='a') {
|
||||||
String[] attrTokens=nextLine.split("\\s+");
|
String[] attrTokens=nextLine.split("\\s+");
|
||||||
TxAttribute attr = new TxAttribute(gen.beginAttrs.get(idx), attrTokens[1]);
|
TxAttribute attr = new TxAttribute(gen.beginAttrs.get(idx), attrTokens[1]);
|
||||||
tx.attributes.add(attr);
|
tx.getAttributes().add(attr);
|
||||||
idx++;
|
idx++;
|
||||||
nextLine=reader.readLine();
|
nextLine=reader.readLine();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
txSink.put(tx.id, tx);
|
loader.transactions.put(id, scvTx);
|
||||||
|
gen.getTransactions().add(tx);
|
||||||
} else if("tx_end".equals(tokens[0])){
|
} else if("tx_end".equals(tokens[0])){
|
||||||
Long id = Long.parseLong(tokens[1]);
|
Long id = Long.parseLong(tokens[1]);
|
||||||
ScvTx tx = transactionsById.get(id);
|
ScvTx tx = loader.transactions.get(id);
|
||||||
assert Long.parseLong(tokens[2])==tx.generatorId;
|
assert Long.parseLong(tokens[2])==tx.generatorId;
|
||||||
tx.endTime=Long.parseLong(tokens[3])*stringToScale(tokens[4]);
|
tx.endTime=Long.parseLong(tokens[3])*stringToScale(tokens[4]);
|
||||||
loader.maxTime = loader.maxTime>tx.endTime?loader.maxTime:tx.endTime;
|
loader.maxTime = loader.maxTime>tx.endTime?loader.maxTime:tx.endTime;
|
||||||
TxGenerator gen = generatorsById.get(tx.generatorId);
|
TxGenerator gen = loader.txGenerators.get(tx.generatorId);
|
||||||
TxStream stream = streamsById.get(gen.stream.getId());
|
TxStream stream = loader.txStreams.get(gen.stream.getId());
|
||||||
if(tx.beginTime==tx.endTime)
|
if(tx.beginTime==tx.endTime)
|
||||||
stream.addEvent(new TxEvent(loader, EventKind.SINGLE, id, tx.beginTime));
|
stream.addEvent(new TxEvent(loader, EventKind.SINGLE, id, tx.beginTime));
|
||||||
else {
|
else {
|
||||||
|
@ -279,7 +221,6 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
nextLine=reader.readLine();
|
nextLine=reader.readLine();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
transactionsById.remove(tx.id);
|
|
||||||
} else if("tx_relation".equals(tokens[0])){
|
} else if("tx_relation".equals(tokens[0])){
|
||||||
Long tr2= Long.parseLong(tokens[2]);
|
Long tr2= Long.parseLong(tokens[2]);
|
||||||
Long tr1= Long.parseLong(tokens[3]);
|
Long tr1= Long.parseLong(tokens[3]);
|
||||||
|
@ -287,22 +228,22 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
if(!loader.relationTypes.containsKey(relType))
|
if(!loader.relationTypes.containsKey(relType))
|
||||||
loader.relationTypes.put(relType, RelationTypeFactory.create(relType));
|
loader.relationTypes.put(relType, RelationTypeFactory.create(relType));
|
||||||
ScvRelation rel = new ScvRelation(loader.relationTypes.get(relType), tr1, tr2);
|
ScvRelation rel = new ScvRelation(loader.relationTypes.get(relType), tr1, tr2);
|
||||||
relationsOut.put(new Long[]{tr1, tr2}, rel);
|
loader.relationsOut.put(tr1, rel);
|
||||||
relationsIn.put(new Long[]{tr2, tr1}, rel);
|
loader.relationsIn.put(tr2, rel);
|
||||||
} else if("scv_tr_stream".equals(tokens[0])){
|
} else if("scv_tr_stream".equals(tokens[0])){
|
||||||
Matcher matcher = scv_tr_stream.matcher(curLine);
|
Matcher matcher = scv_tr_stream.matcher(curLine);
|
||||||
if (matcher.matches()) {
|
if (matcher.matches()) {
|
||||||
Long id = Long.parseLong(matcher.group(1));
|
Long id = Long.parseLong(matcher.group(1));
|
||||||
TxStream stream = new TxStream(loader, id, matcher.group(2), matcher.group(3));
|
TxStream stream = new TxStream(loader, id, matcher.group(2), matcher.group(3));
|
||||||
streamsById.put(id, stream);
|
loader.txStreams.put(id, stream);
|
||||||
}
|
}
|
||||||
} else if("scv_tr_generator".equals(tokens[0])){
|
} else if("scv_tr_generator".equals(tokens[0])){
|
||||||
Matcher matcher = scv_tr_generator.matcher(curLine);
|
Matcher matcher = scv_tr_generator.matcher(curLine);
|
||||||
if ((matcher.matches())) {
|
if ((matcher.matches())) {
|
||||||
Long id = Long.parseLong(matcher.group(1));
|
Long id = Long.parseLong(matcher.group(1));
|
||||||
TxStream stream=streamsById.get(Long.parseLong(matcher.group(3)));
|
TxStream stream=loader.txStreams.get(Long.parseLong(matcher.group(3)));
|
||||||
generator=new TxGenerator(id, stream, matcher.group(2));
|
generator=new TxGenerator(id, stream, matcher.group(2));
|
||||||
generatorsById.put(id, generator);
|
loader.txGenerators.put(id, generator);
|
||||||
}
|
}
|
||||||
} else if("begin_attribute".equals(tokens[0])){
|
} else if("begin_attribute".equals(tokens[0])){
|
||||||
Matcher matcher = begin_attribute.matcher(curLine);
|
Matcher matcher = begin_attribute.matcher(curLine);
|
||||||
|
@ -337,12 +278,9 @@ public class TextDbLoader implements IWaveformDbLoader{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ITx getTransaction(Long txId) {
|
public ITx getTransaction(long source) {
|
||||||
if(txCache.containsKey(txId))
|
return new Tx(this, transactions.get(source));
|
||||||
return txCache.get(txId);
|
}
|
||||||
Tx tx = new Tx(this, txId);
|
|
||||||
txCache.put(txId, tx);
|
|
||||||
return tx;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ package com.minres.scviewer.database.text;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import com.minres.scviewer.database.IWaveform;
|
import com.minres.scviewer.database.IWaveform;
|
||||||
|
@ -29,21 +30,21 @@ class Tx implements ITx {
|
||||||
|
|
||||||
private int concurrencyIndex;
|
private int concurrencyIndex;
|
||||||
|
|
||||||
Tx(TextDbLoader loader, Long id){
|
public Tx(TextDbLoader loader, ScvTx scvTx) {
|
||||||
this.loader=loader;
|
this.loader=loader;
|
||||||
this.scvTx = loader.transactions.get(id);
|
this.scvTx = scvTx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<ITxRelation> getIncomingRelations() {
|
public Collection<ITxRelation> getIncomingRelations() {
|
||||||
NavigableMap<Long[], ScvRelation> rels = loader.relationsIn.prefixSubMap(new Long[]{scvTx.getId()});
|
Set<ScvRelation> rels = loader.relationsIn.get(scvTx.getId());
|
||||||
return rels.values().stream().map(rel -> new TxRelation(loader, rel)).collect(Collectors.toList());
|
return rels.stream().map(rel -> new TxRelation(loader, rel)).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Collection<ITxRelation> getOutgoingRelations() {
|
public Collection<ITxRelation> getOutgoingRelations() {
|
||||||
NavigableMap<Long[], ScvRelation> rels = loader.relationsOut.prefixSubMap(new Long[]{scvTx.getId()});
|
Set<ScvRelation> rels = loader.relationsOut.get(scvTx.getId());
|
||||||
return rels.values().stream().map(rel -> new TxRelation(loader, rel)).collect(Collectors.toList());
|
return rels.stream().map(rel -> new TxRelation(loader, rel)).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -55,6 +56,18 @@ class Tx implements ITx {
|
||||||
return getId().compareTo(o.getId());
|
return getId().compareTo(o.getId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) return true;
|
||||||
|
if (obj == null || getClass() != obj.getClass()) return false;
|
||||||
|
return this.scvTx.equals(((Tx) obj).scvTx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return scvTx.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "tx#"+getId()+"["+getBeginTime()/1000000+"ns - "+getEndTime()/1000000+"ns]";
|
return "tx#"+getId()+"["+getBeginTime()/1000000+"ns - "+getEndTime()/1000000+"ns]";
|
||||||
|
@ -85,6 +98,10 @@ class Tx implements ITx {
|
||||||
return scvTx.endTime;
|
return scvTx.endTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setEndTime(Long time) {
|
||||||
|
scvTx.endTime=time;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getConcurrencyIndex() {
|
public int getConcurrencyIndex() {
|
||||||
return concurrencyIndex;
|
return concurrencyIndex;
|
||||||
|
|
Loading…
Reference in New Issue