本文内容部分来自Trident Tutorial

Trident是基于Storm的实时计算模型的高级抽象。它可以实现高吞吐(每秒数百万条消息)的有状态流处理和低延迟分布式查询。如果以前使用过高级批处理工具(比如Pig或Cascading),则对Trident的概念会非常熟悉,比如连接、聚合、分组、功能处理和过滤等。除此之外,Trident还增加了用于在数据库或持久化存储上进行有状态的增量处理的原语。Trident具有一致性、一次性语义,所以很容易就能够推导出Trident拓扑结构。

Trident的出现算是程序猿非常懒的又一个铁证。Strom是一个实时流处理工具,有很高的吞吐。在实际应用场景中,很多场景是借助这种实时处理能力,对实时数据进行统计,然后将统计结果实时推送到大屏或者其他可以实时浏览的地方,这样领导或者活动运营就可以实时查看销售或活动情况,比如,双十一时候的大屏,就可以使用Storm来做(我们现在就是这样做的,把全渠道的销售情况进行实时统计,然后显示在大屏上,据说领导会看)。然后,程序猿们就发现,很多统计功能非常类似,所以进行抽象,使用更加高级的功能代替一个一个的Spout、Bolt(当然,Trident拓扑结构运行的时候也是解析成Spout和Bolt运行)。

然后又有人发现,Trident这种方式也是比较麻烦,即使程序猿们通过高级抽先的Trident省去了很多麻烦,但是还是架不住运维、运营、产品等不断改变的需求,所以就有很多SQL方式解析为Trident或普通Topology的工具产生。既然运维、运营、产品等不断修改需求,那就简单的通过SQL查询(不同的SQL解析为不同的拓扑结构,在Storm中运行,可以得出不同的结果)。比如:squall

这些都是题外话,下面继续说Trident。

1 一个例子

接下来看一个Trident的例子:

  1. 统计输入句子中单词数量
  2. 实现单词统计结果的查询

首先实现一个不断发送句子的Spout:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);

上面的Spout循环发送句子流,下面是计算流中单词数量的代码,也就是Trident的本体:

TridentTopology topology = new TridentTopology(); // 1
TridentState wordCounts =
     topology.newStream("spout1", spout) // 2
       .each(new Fields("sentence"), new Split(), new Fields("word")) // 3
       .groupBy(new Fields("word")) // 4
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) // 5
       .parallelismHint(6);

一步步说下代码:

行1,创建TridentTopology对象topology,这个对象就是Trident计算的入口。

行2,TridentTopologynewStream方法是定义Trident的输入源,这里使用的是一开始定义的FixedBatchSpout对象。当然,输入源也可以是Kafka之类的队列Broker(这个实在不知道应该翻译成什么,不过好在是说Broker,大家就都明白是干啥的)。这个地方,Trident会在Zookeeper中跟踪每个输入源的状态元数据。也就是Trident中比较重要的状态的概念,这个后面再说。Trident会将输入的流分成更小的批量数据(这里比较绕口,可以理解为Trident的入口进来一个大的批量数据,然后Trident把这个大的批量数据进行分割,变成一堆小的批量数据,比如进来的是1000条,分割成10个100条)进行处理,比如,把传入的流分成下面的样子:

batched stream

通常,小批量数据的数量会是数千或数百万个,这取决于吞吐量。

Trident提供了一整套完整的批量处理API来处理这些小批量数据。类似于Hadoop的高级抽象中处理Pig或Cascading的内容:分组、连接、聚合、功能操作、过滤等。当然,分别处理每个小的批量数据并不容易(使用Hadoop处理大的矩阵乘法的就会深有体会),所以Trident提供了跨批次进行聚合处理的功能,并可以将这些聚合结果持久化在内存中、Memcached、Cassandra或其他存储中。Trident还具有一流的实时状态查询功能,这个状态可以有Trident更新,或者其他独立的状态来源。

行3,Spout发出包含名为sentence的field的流。通过使用Split函数,对每个tuple进行处理,把名为sentence的字段分割为一个个单词。将分割的单词命名为word,继续向下分发。下面是Split的定义:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

行4,Trident进行了单词计数和结果的持续存储。先对word字段进行分组,然后用Count聚合器持续聚合。

Trident的一个很强的特性是能够完全容错和一次性处理语义。Trident能够保持状态,如果发生故障需要重试,则不会对同一源数据多次更新数据。

行5,persistentAggregate函数实现了存储和更新聚合结果的功能,不需要操心。例子中,计数结果保存在内存中,当然也可以使用Memcached、Cassandra或其他持久化存储。这里先不做讨论。persistentAggregate方法将Stream转换为TridentState对象。在这里,TridentState对象表示所有单词计数,然后使用TridentState对象来实现分布式查询。

接下来实现对单词计数实现低延迟分布式查询,以空格分割的单词列表作为输入,返回这些单词的计数总和。这个查询会在后台做并行化处理,其他的与普通的RPC调用一样。比如像下面这样调用:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

如上面的代码,除了它是在Storm集群中并行执行外,与普通的RPC调用没什么区别。通常简单的RPC查询,延迟在10ms左右,复杂的DRPC查询可能需要更长的时间,具体的时间取决于计算被分配的资源多少。

拓扑中分布式查询部分的实现如下所示:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word")) // 6
       .groupBy(new Fields("word")) // 7
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) // 8
       .each(new Fields("count"), new FilterNull()) // 9
       .aggregate(new Fields("count"), new Sum(), new Fields("sum")); // 10

使用同一个TridentTopology对象来创建DRPC流,命名该函数为words,函数名与DRPCClient执行时的第一个参数名称相同。每个DRPC请求都是一个小的批量作业,将请求的单个tuple作为输入,tuple中包含名为args的字段,args中包含了客户端的请求参数。在这个例子中,请求参数就是“cat dog the man”。

行6,通过Split函数将请求参数分解为一个个单词,组成“word”流。

行7,将上一步分解的“word”流分组。

行8,stateQuery方法用于查询第一部分生成的TridentState对象。MapGet将被执行,根据输入的单词,查询单词的数量。因为DRPC流的分组方式与TridentState分组方式相同(都是通过word字段),所以每个单词查询会被自动路由到该单词的TridentState对象的分区。

行9,通过FilterNull函数过滤掉没有计数结果的单词。

行10,使用Sum函数对存在计数结果的进行加和,然后通过Trident自动将结果返回客户端。

Trident在如何以最大限度的提高性能来执行拓扑方面是非常智能的。比如,它会进行下面两个提高性能的自动化操作:

  1. 对状态的读取或写入操作(如persistentAggregatestateQuery),会自动的进行批处理。比如,如果需要在当前批处理操作中执行20次更新操作,Trident会自动批量的读取或写入,仅执行一次读请求或写请求,而不是20次。
  2. Trident聚合操作进行了大量优化。Trident会将同一个组中的所有tuple发送到同一台机器,进行部分聚合操作,然后再通过网络发送tuple。比如,Count聚合操作会先计算每个分区的数量,然后将这些统计结果通过网络传输到一起,再根据这些初始统计结果计算最后的结果。

2 再来个例子

下面的例子是一个纯DRPC拓扑,用于计算URL的覆盖范围,就是在Twitter上发布的URL影响的范围。要计算这个数据,需要先获取所有推送URL的人,然后获取所有这些人的粉丝,去重,计算总数。这种计算需要消耗非常多的资源,可能需要数千次数据库查询操作和数千万的tuple(当然,这是针对Twitter这种应用体量来说的。如果用户只有几个,一个SQL估计就出结果了)。所以就需要Storm和Trident这种可以并行化跨集群的计算。

下面这个拓扑会从两个状态读取数据:一个是将URL与分享过该URL的人做的映射,另一个是将一个人与这个人的粉丝做的映射。查询拓扑如下:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());

topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")) // 1
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) // 2
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) // 3
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower")) // 4
       .groupBy(new Fields("follower")) // 5
       .aggregate(new One(), new Fields("one")) // 6
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

该拓扑使用newStaticState方法创建从外部数据库获取的TridentState对象,然后在拓扑中进行查询。与其他类似,这些数据库查询可以自动批量化,以求效率最大化。下面分解来说:

行1,将前面从urlToTweeters数据库读取的数据作为输入,通过MapGet函数进行处理。

行2,通过ExpandList函数将每个人分解为不同的tuple数据。

行3,将前面从tweeterToFollowers数据库读取的数据作为输入,与行2中的数据,通过MapGet函数进行处理,查询每个人的粉丝列表。这里最重要的就是并行话,所以需要shuffle来将所有人均匀分布到所有的workder上。可以看到在下面的parallelismHint中,并行数是200,说明该操作并行程度非常高,占据了计算的大部分资源。

行4、行5、行6,这里是对粉丝进行单独统计和计数。首先是粉丝列表数据通过ExpandList函数进行分解为不同tuple数据,然后将粉丝数据进行分组,在对粉丝数据进行One聚合。这里也是通过parallelismHint进行并行计算。

One聚合器定义如下:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }

   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }

   public Integer zero() {
       return 1;
   }        
}

这是一个combiner聚合器,它能够先进行部分聚合,然后通过网络传输tuple进行最后的聚合,以最大限度地提高效率。

接下来说说Trident的数据结构。

3 字段和tuple数据

Trident中用来传输的数据模型名为TridentTuple,是一个命名的值列表。在一个拓扑中,tuple通过一系列的操作逐步创建。这些操作就是将输入数据进行处理,然后返回输出数据。

比如,有一个名为“stream”的流,它包含字段“x”,“y”和“z”。 要运行一个以“y”为输入的过滤器MyFilter,可以这样做:

stream.each(new Fields("y"), new MyFilter())

MyFilter定义如下:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}

这里保留了“y”字段小于10的所有tuple数据。MyFilter输入的TridentTuple只有“y”字段。Trident可以高效的选择一组tuple作为输入,这种选择是0消耗的。

接下来看看function fields是如何工作的,不如下面这个函数:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}

这个操作是将两个数字作为数据,然后发出两个新值:两数字的和与积。比如有“x”,“y”和“z”三个字段的流,可以如下操作:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

function函数操作的输出结果是追加在输入tuple中的,因此经过AddAndMultiply操作之后输出tuple包含五个字段:“x”,“y”,“z”,“added”和“multipl”。“added”对应的是AddAndMultiply发出的第一个值,”multiplied”对应的是第二个值。

aggregator聚合操作是替换tuple数据,比如,有一个包含“val1”和“val2”的流:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

输入结果将只包含“sum”一个字段的tuple,表示“val2”的和。

groupBy分组操作,输出结果将包含分组字段和聚合器发出的字段,比如:

stream.groupBy(new Fields("val1"))
     .aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

例子中,输出结果包含“val2”和“sum”两个字段。

4 状态

实时计算解决问题的一个关键是如何管理状态,以便在失败或重试时能够实现幂等性。消除实时计算过程中的故障是不可能的,所以,当一个节点死亡或其他问题出现时,就需要重试。问题是,如何更新状态(通过外部存储或拓扑内部状态),以便每个消息只处理一次。

这是一个棘手的问题,有一下几种情况。假设,如果要做一个计数的聚合操作,然后将技术存储在数据库中。如果只是将结果存储,当需要更新结果的时候,无法知道这个结果是否是新结果还是重试结果。该结果可能是之前尝试过更新过的,已经成功更新了数据库,但是后续步骤中失败了。也可能是尝试更新数据库的时候,更新数据库失败了。

Trident通过下面的方式解决这个问题:

  1. 每个批次的数据被赋予唯一的ID,称为“transaction id”。如果这批数据重新计算,会携带相同的ID。
  2. 批次将状态隔离。在批次2状态更新之前,不会更新批次3的状态。

更加这两个原子操作,可以通过状态更新完成一次语义操作。需要将事务ID与计数结果一起存储,作为原子值。然后,当更新操作时,通过数据库中的事务ID与当前批次的事务ID进行比较。如果相同,说明是同一批次的数据,就跳过更新。如果不同,就增加计数。

当然,不需要在拓扑中手动执行这个逻辑,这个逻辑包装在State中并自动完成。而且,State 状态对象也不是实现事务ID所必需的,如果不想在数据库中存储事务ID,也可以不存储。这样的话,State 在失败的情况下也会至少执行一次(可能会更好)。可以在这里这里学习更多的关于State状态的内容。

State 状态可以存储在任何地方,外部存储,内部状态等。如果想使用一个内存状态实现,保留几个小时的数据可用,然后就将其丢弃,可以看这里

5 执行拓扑

Trident拓扑结果最后会编译成高效的Strom拓扑。当需要重新分区数据时,比如groupByshuffle,tuple就只能通过网络传输。所以如果Trident拓扑结构如下:

trident to storm

最后会被编译成Strom spout/bolt的结构,如下:

trident to storm

6 结论

Trident 使实时计算更加优雅。可以通过Trident的API实现高吞吐的流处理、状态处理、低延迟查询等功能。而且Trident中做了很多优化,可以获取最大性能。


个人主页: http://www.howardliu.cn

个人博文: storm笔记:Trident应用

CSDN主页: http://blog.csdn.net/liuxinghao

CSDN博文: storm笔记:Trident应用