storm笔记:Trident应用中说了下Trident的使用,这里说下Trident几种状态的变化及其对应API的使用。

本文内容来自Trident State,部分内容根据实际情况做出修改。

Trident中有对状态数据进行读取和写入操作的一流抽象工具。状态既可以保存在拓扑内部,比如保存在内容中并由HDFS存储,也可以通过外部存储(比如Memcached或Cassandra)存储在数据库中。而对于Trident的API而言,这两种机制没有任何区别。

Trident以容错的方式管理状态,以便在重试或失败时的状态更新是幂等的。在大数据处理中,数据处理的幂等性是非常重要的一个指标,这样能够保证每个消息即使处理了多次,结果也像是只处理了一次一样。

在进行状态更新时可能需要各种级别的容错能力,在这之前,我们来看一个例子说明实现“恰好一次”语义所需的技巧。比如,正在对流中的数据进行计数聚合操作,每次处理新的元组时,都会将运行的计数结果存储在数据库中。

如果发生故障时,元组将重新执行计数操作。这就会在执行状态更新时出现问题,因为这个时候不知道是不是已经更新过该元组状态。也许还没有处理该元组数据,这个时候就需要增加计数。也许已经处理该元组,并成功增加计数,但是在下一步的时候出现问题,这种情况下,就不应该增加计数。也有可能是处理元组正常,更新计数是异常,这个时候就需要更新计数。

所以说,如果只是在数据库中存储计数信息,就不知道元组是否已经处理过。因此,就需要更多的信息作为辅助。Trident提供了下面三个性质,来实现“恰好一次”的处理:

  1. 元组都是以小批次处理
  2. 每批元组都会给出一个唯一ID,称为事务ID(transaction id,txid)。如果批次重复处理,txid也会相同。
  3. 状态的更新操作是按照元组批次的顺序执行的。也就是说,在批次2状态更新成功之前,不会进行批次3的状态更新。

根据这些特性,就可以通过检查到该元组的批次是否已被处理,并根据检测结果采取适当的操作更新状态了。采取的具体操作取决于Spout的类型。Spout有三种类型:“非事务型(non-transactional)”,“事务型(transactional)”和“不透明事务型(opaque transactional)”。对应的容错能力也是三种:“非事务”,“事务”和“不透明事务”。下面来看看Spout的各个类型及对应的容错能力。

事务型Spout

Trident是按照批次发送元组进行处理的,每个批次的元组被赋予唯一的事务ID。Spout的特性根据他们所提供容错性保证机制来决定的,而且这种机制也会对每个批次发生作用。事务型Spout有如下特性:

  1. 每个批次的txid不变,对于一个特定的txid,重复执行时,它所包含的元组数据与第一次完全相同。
  2. 元组只会在一个批次出现,不会重复(某个元组只会出现在一个批次中,不会出现在多个批次中)。
  3. 每个元组都会出现一次(不会遗漏任何的元组数据)

这是最简单最容易理解的一种Spout类型,数据流被分割成固定的批次。storm中有与Kafka集成的事务型Spout的扩展,代码在这里

既然事务型Spout这么简单易懂,为什么不在Trident中完全使用事务型Spout呢?其实就在于它的容错能力。比如,TransactionalTridentKafkaSpout的工作方式是,同一个txid的批次中将包含kafka所有分区的元组。一旦某个批次发出后,出现异常,需要重新发出,就需要完全相同的元组集合才能满足事务型Spout要求的语义。但是这个时候,kafka某个节点异常(节点关闭或分区不可用),就无法获取完全相同的的一批元组,那整个拓扑就会应为第3条语义(批次按顺序执行)停止。

这就是要有“不透明事务型”Spout的原因了,它能够容忍数据源节点丢失,而且又能保证数据恰好被操作一次。

注:对kafka比较熟悉的应该会想到,如果某一个topic支持复制,那即使一个节点不可用,还会有其他复制节点顶上,那TransactionalTridentKafkaSpout也能够避免上面的问题。

下面继续看看如何设计一个支持恰好一次特性的“事务型”Spout语义(简单的说就是同一个txid对应的批次元组数据完全一致)的状态实现,这种状态称为“事务型状态”。

比如,现在有一个单词计数的拓扑,需要将单词计数存储在key/value数据库中。key是单词,value中包含单词数量。另外,为了确定同一批次元组是否已经被执行,需要将txid也一同存储在value中。这样,当需要更新单词数量的时候,先比较txid是否相同,如果相同,就跳过更新。如果不同,就更新计数。

考虑这个为什么它工作的例子。 假设您正在处理由以下批次元组组成的txid 3:

比如,要处理一个txid是3的一批元组:

["man"]
["man"]
["dog"]

目前数据库中存储的数据为:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

在这个时候,发现“man”对应的txid是1,当前的txid是3,就可以更新了。然后“dog”对应的txid是3,说明同一批次的元组数据已经发送过了,就不需要更新。从这点可以看出,txid是3的批次元组是重复发送的,在更新“dog”数量后,在更新“man”数量前,出现了错误。最后的结果就是:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=10, txid=2]

不透明事务型Spout

前面已经提过,不透明事务型Spout不能保证相同txid对应的批次中的元组数据完全一致。其特点如下:

  1. 每个元组都会在有且仅有一个批次中处理成功。

[OpaqueTridentKafkaSpout](http://github.com/apache/storm/tree/v1.1.0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java)具有这种特性,同时对kafka节点异常有很好的容错性。OpaqueTridentKafkaSpout在发送一个批次元组的时候,会从上次成功之后的位置开始发送,这样就能够保证元组不会漏发或重发。

基于上面的特点,不透明事务型Spout就不同通过txid来直接判断是否可以跳过状态更新,因为具有相同txid的批次中元组可能发生了变化。

这就需要存储更多的状态信息了,而不仅仅是一个结果和一个txid了,还需要存储前一个结果值。

比如,当前批次的计数是2,需要进行一次状态更新,数据库中的数据如下:

{
  "value": 4,
  "prevValue": 1,
  "txid": 2
}

如果当前的txid是3,与数据库中的不同。在这种情况下,需要将prevValue的值该为value的值,value的值增加2,更新txid为3,最后的结果就是:

{
  "value": 6,
  "prevValue": 4,
  "txid": 3
}

如果当前的txid是2,等于数据库中的txid。因为txid相同,说明上一次txid为2的批次处理失败,但是本次的元组可能与上一次不同了。这个时候,就需要使用本次数据覆盖上次处理结果。也就是说,prevValue值不变,value的值改为prevValue加2,txid不变,最后的结果如下:

{
  "value": 3,
  "prevValue": 1,
  "txid": 2
}

这种方式的可行性依赖于Trident的强顺序性。也就是说,一旦开始处理一个新的批次,就不会重复执行上一个批次。不透明事务型Spout保证了不同批次之间没有重复的情况,也就是每个元组只会在一个批次中处理成功,所以就可以放心的使用前一个值与当前值覆盖已存数据了。

非事务型Spout

非事务型Spout不能为批次提供任何保证。所以可能出现”至多一次”的处理,即在某个批次处理过程中失败了,但是不会在重新处理;也可能提供“至少一次”的处理,即可能会有多个批次分别处理某个元组。也就是没有办法实现“恰好一次”的语义。

不同类型spout和状态总结

下面是不同的spout/状态组合是否支持“恰好一次”处理语义:

Spouts vs States

不透明事务状态有最强的容错性,但是因为存储txid和两个结果带来更大的开销。事务型状态只需要存储一个状态结果,但是只对事务型Spout有效。非事务型状态要求存储的数据更少,但是不能实现“恰好一次”的处理语义。

所以在选择容错与存储空间中,需要根据具体的需要选择合适的组合。

状态API

根据前面来看,“恰好一次”语义的原理有些复杂,但是作为用户,并不需要了解这些txid对比、多值存储,因为Trident已经在State中封装了所有容错处理逻辑,只需要想下面着用携带码就行:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(MemcachedState.opaque(serverLocations), new Count(), new Fields("count"))
                .parallelismHint(6);

所有的不透明事务状态逻辑已经封装在MemcachedState.opaque中,另外,状态更新会自动调整为批次操作,这样可以减少与数据库之间反复交互带来的资源浪费。

基本的State接口只有两个方法:

public interface State {
    void beginCommit(Long txid); // 对于像DRPC流发生的partitionPersist这样的事情,可以是null
    void commit(Long txid);
}

前面已经说过,状态更新开始和结束时都会获取txid。Trident并不关心状态如何操作,使用哪种方式更新,使用哪种方式读取。

假如有一个包含用户地址信息的定制数据库,需要使用Trident与数据库交互,State扩展类中包含对于用户信息的getter和setter方法:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocation(long userId, String location) {
        // 向数据库设置地址信息
    }

    public String getLocation(long userId) {
        // 从数据库中获取地址信息
    }
}

然后就需要一个StateFactory来创建Trident所需的State对象,LocationDB所需的StateFactory大体结构如下:

public class LocationDBFactory implements StateFactory {
    public State makeState(Map conf, int partitionIndex, int numPartitions) {
        return new LocationDB();
    }
}

Trident提供了用于查询状态源的QueryFunction接口,以及更新状态源的StateUpdater接口。比如,查询LocationDB中用户信息的QueryLocation

TridentTopology topology = new TridentTopology();
TridentState locations = topology.newStaticState(new LocationDBFactory());
topology.newStream("myspout", spout)
        .stateQuery(locations, new Fields("userid"), new QueryLocation(), new Fields("location"));

QueryLocation的代码如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<String> ret = new ArrayList();
        for (TridentTuple input : inputs) {
            ret.add(state.getLocation(input.getLong(0)));
        }
        return ret;
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

QueryFunction操作分为两步:首先,Trident会将收集到的数据放在一个批次中,发送给batchRetrieve方法。在这个例子中,batchRetrieve方法收到的是一些用户id。batchRetrieve会返回一组与输入元组长度相同的结果。输入元组与输出结果中各个元素是彼此对应的。

从这点来看,上面的LocationDB类并没有发挥Trident批处理优势,所以需要尽心改造:

public class LocationDB implements State {
    public void beginCommit(Long txid) {
    }

    public void commit(Long txid) {
    }

    public void setLocationsBulk(List<Long> userIds, List<String> locations) {
        // set locations in bulk
    }

    public List<String> bulkGetLocations(List<Long> userIds) {
        // get locations in bulk
    }
}

对应的QueryLocation类如下:

public class QueryLocation extends BaseQueryFunction<LocationDB, String> {
    public List<String> batchRetrieve(LocationDB state, List<TridentTuple> inputs) {
        List<Long> userIds = new ArrayList<Long>();
        for (TridentTuple input : inputs) {
            userIds.add(input.getLong(0));
        }
        return state.bulkGetLocations(userIds);
    }

    public void execute(TridentTuple tuple, String location, TridentCollector collector) {
        collector.emit(new Values(location));
    }
}

这段代码大幅减少了数据库操作。

对于更新状态,可以使用StateUpdater接口。比如下面的更新操作:

public class LocationUpdater extends BaseStateUpdater<LocationDB> {
    public void updateState(LocationDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<Long> ids = new ArrayList<Long>();
        List<String> locations = new ArrayList<String>();
        for (TridentTuple t : tuples) {
            ids.add(t.getLong(0));
            locations.add(t.getString(1));
        }
        state.setLocationsBulk(ids, locations);
    }
}

对应的更新操作拓扑中就可以是这样:

TridentTopology topology = new TridentTopology();
TridentState locations =
        topology.newStream("locations", locationsSpout)
                .partitionPersist(new LocationDBFactory(), new Fields("userid", "location"), new LocationUpdater());

partitionPersist方法会更新状态,StateUpdater接口接收一批元组和状态信息,然后更新状态。上面的LocationUpdater类中仅仅是从元组中抓取用户id和地址信息,然后对状态执行批量处理。然后,partitionPersist会返回一个表示更新状态后的TridentState对象。随后就可以在拓扑的其他地方使用stateQuery方法查询状态。

StateUpdaterupdateState方法中有一个TridentCollector参数,这个对象是可以将发送进来的元组发送到一个新的数据流中。在这个例子中没有用到。如果需要进行比如向数据库更新计数值的后续操作,可以通过TridentState#newValuesStream方法获取新的数据流数据。

persistentAggregate

Trident使用一个名为persistentAggregate的方法更新状态。前面已经出现过,这里再写一遍:

TridentTopology topology = new TridentTopology();
TridentState wordCounts =
        topology.newStream("spout1", spout)
                .each(new Fields("sentence"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

partitionPersist是一个接收Trident聚合器作为参数并对状态数据进行更新的方法,persistentAggregate就是构建于partitionPersist上层的一个编程抽象。在这个例子中,通过groupBy返回一个分组数据,Trident需要一个实现MapState接口的对象。分组字段是状态的key,聚合结果是状态的value。MapState接口如下:

public interface MapState<T> extends State {
    List<T> multiGet(List<List<Object>> keys);
    List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

如果你需要在未分组的数据流上执行聚合操作时,Trident需要一个实现Snapshottable接口的对象:

public interface Snapshottable<T> extends State {
    T get();
    T update(ValueUpdater updater);
    void set(T o);
}

MemoryMapStateMemcachedState 都实现了这些接口.

实现MapState接口

实现MapState接口非常简单,Trident几乎把所有事都做完了。OpaqueMapTransactionalMapNonTransactionalMap都分别实现了各自的容错语义。只需要为这些类提供一个用于对不同key/value进行批量获取、批量修改的IBackingMap实现就行。IBackingMap接口如下:

public interface IBackingMap<T> {
    List<T> multiGet(List<List<Object>> keys);
    void multiPut(List<List<Object>> keys, List<T> vals);
}

OpaqueMap会使用OpaqueValue作为vals参数调用multiPut方法;TransactionalMap会使用TransactionalValue作为参数;NonTransactionalMaps会直接把拓扑对象传入。

Trident还提供了CachedMap类来实现key/value的自动LRU缓存操作。

最后,Trident还提供了SnapshottableMap类,该类通过将全局聚合的结果存入一个固定key中的方法将MapState对象转化为Snapshottable对象。

可以参考MemcachedState的实现来了解如何将这些工具结合在一起来提供一个高性能的MapState实现。MemcachedState支持不透明事务、事务和非事务语义。


个人主页: https://www.howardliu.cn
个人博文: storm笔记:Trident状态
CSDN主页: http://blog.csdn.net/liuxinghao
CSDN博文: storm笔记:Trident状态