11.4 Spark与Redis整合原理与实战

本节通过生产环境实战案例讲解Spark与Redis整合原理及Spark与Redis整合实战。

11.4.1 Spark与Redis整合原理

Redis是一个开源项目(BSD许可)。Redis以内存数据结构存储,用作数据库、缓存和消息代理。它支持数据结构,如字符串、散列、列表、集合、具有范围查询的排序集、位图、超文本和具有半径查询的地理空间索引等。Redis内置复制、Lua脚本、LRU eviction、事务和不同级别的磁盘持久性,通过Redis Sentinel提供高可用性,并通过Redis Cluster进行自动分区。

Redis可以对这些类型运行原子操作,如附加到字符串、在哈希中增加值、将元素推送到列表中、计算集交集、联合与差异于一体;或者在排序集中获得最高排名的成员。

为了实现其卓越的性能,Redis使用内存中的数据集。根据业务用例,可以通过将数据集一次性转储到磁盘中,或通过将每个命令附加到日志来持久化。如果只需要功能丰富的网络内存缓存,则可以选择禁用持久性。

Redis还支持简单的主从异步复制,第一次同步非阻塞的速度非常快,网络切分传输时可自动重连同步。

Redis的其他功能包括:

 事务Transactions。

 发布/订阅。

 Lua脚本。

 Keys with a limited time-to-live。

 LRU eviction of keys。

 自动故障切换。

大多数编程语言可以使用Redis。

Redis以ANSI C编写,适用于大多数POSIX系统,如Linux、* BSD、OS X,无需外部依赖。Linux和OS X是Redis开发和测试的两个操作系统,建议使用Linux进行部署。Redis可能在诸如SmartOS的Solaris衍生系统中工作,但支持是尽力而为的。没有官方支持Windows版本,但是Microsoft开发并维护了Redis的Win-64端口。

11.4.2 Spark与Redis整合实战

本节以生产环境中Spark与Redis整合实战案例来讲解。在通信运营商的Spark大数据项目中,Spark每分钟实时读取Hdfs中的话单数据,经过业务逻辑代码分析转换后,将清洗以后的数据转换成一个List字符串列表,然后遍历List字符串列表,将每条记录拼接成一个Key-Value字符串放入Redis队列。

Spark与Redis整合实战案例实现步骤如下。

(1)通过Maven方式下载Redis的jedis 2.6.0 Jar包。

pom.xml文件增加以下内容:

1.  <dependency>
2.        <groupId>redis.clients</groupId>
3.        <artifactId>jedis</artifactId>
4.        <version>2.6.0</version>
5.    </dependency>

在Spark中导入Redis的Jar包。

1.  import redis.clients.jedis.Jedis;
2.  import redis.clients.jedis.JedisPool;
3.  import redis.clients.jedis.JedisPoolConfig;

(2)在项目config.properties配置文件中增加Redis的主机地址、端口、密码,以及Redis连接池分配的连接数、等待时间等信息。

1.  ## REDIS
2.  redis.ip=100.*.*.100
3.  redis.port=6379
4.  redis.password=password
5.  ......
6.  ## redis
7.  #最大分配的对象数
8.  redis.pool.maxTotal=1024
9.  #最大能够保持idle状态的对象数
10. redis.pool.maxIdle=200
11. #当池内没有返回对象时,最大等待时间
12. redis.pool.maxWait=1000
13. #当调用borrow Object方法时,是否进行有效性检查
14. redis.pool.testOnBorrow=true
15. #当调用return Object方法时,是否进行有效性检查
16. redis.pool.testOnReturn=true
17.

(3)编写RedisServiceImpl实现类,从配置文件中获取Redis的主机地址、端口、密码等信息;编写getFromPool方法从redis访问池中获取redis实例;编写getSingle方法获取redis实例。

1.   public class RedisServiceImpl {
2.      private static final Map<String, String> REDIS_CONFIG = Config.
        getInstance().getRedisParams();
3.      private static final String REDIS_IP = REDIS_CONFIG.get("redis.ip");
4.      private static final String REDIS_PORT = REDIS_CONFIG.get("redis.port");
5.      private static final String REDIS_PASSWORD = REDIS_CONFIG.get("redis.
        password");
6.      private static final int REDIS_TIMEOUT = 2000;
7.      private static Logger log = LoggerFactory.getLogger(RedisServiceImpl.class);
8.      private static JedisPool pool;
9.      /**
          * 从redis访问池中获取redis实例
          *
          * @return redis实例
10.       */
11.     public static Jedis getFromPool() {
12.         if (pool == null) {
13.             JedisPoolConfig config = new JedisPoolConfig();
14.             config.setMaxTotal(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxTotal")));
15.             config.setMaxIdle(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxIdle")));
16.             config.setMaxWaitMillis(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxWait")));
17.             config.setTestOnBorrow(Boolean.valueOf(REDIS_CONFIG.get("redis.
                 pool.testOnBorrow")));
18.             config.setTestOnReturn(Boolean.valueOf(REDIS_CONFIG.get("redis.
                 pool.testOnReturn")));
19.             pool = new JedisPool(config, REDIS_IP, Integer.valueOf(REDIS_
                PORT), REDIS_TIMEOUT, REDIS_PASSWORD);
20.         }
21.         return pool.getResource();
22.     }
23.
24.     public static void returnResource(Jedis redis) {
25.         pool.returnResource(redis);
26.     }
27.
28.     /**
          * 获取redis实例
          *
          * @return redis实例
29.       */
30.
31.     public static Jedis getSingle() {
32.         Jedis redis = new Jedis(REDIS_IP, Integer.valueOf(REDIS_PORT),
            REDIS_TIMEOUT);
33.         redis.auth(REDIS_PASSWORD);
34.         return redis;
35.     }
36. }

(4)编写项目的业务代码,RedisBean类数据结构用于要存放Redis的数据。

1.  public class RedisBean {
2.      public void setKey(String key) {
3.         this.key = key;
4.     }
5.
6.     public void setValue(String value) {
7.         this.value = value;
8.     }
9.
10.    public String getKey() {
11.        return key;
12.    }
13.
14.    public String getValue() {
15.        return value;
16.    }
17.
18.    protected String key;
19.    protected String value;
20.
21. }

根据项目的业务需求,将Spark中提取转换后的每条记录存入业务数据结构RedisBean的key、value中;然后加入到RedisBeanList列表中。RedisBeanList是List<RedisBean> RedisBeanList类型。

1.  if ("RedisTestQUALINFO".equals(keyType)) {
2.         RedisBean.key = "RedisTestQUALINFO";
3.         RedisBean.value = RedisTestReslut.toString();
4.         RedisBeanList.add(RedisBean);
5.     }

(5)最终调用业务方法addToRedis,通过RedisServiceImpl.getSingle()获取Redis实例,然后循环遍历List<RedisBean>的每个元素,调用redis.lpush方法分别将Key值、Value值lpush到Redis中。lPush完成以后,通过redis.close关闭连接。

1.     public static void addToRedis(List<RedisBean> redisBeanList) {
2.     Jedis redis = RedisServiceImpl.getSingle();
3.     for (RedisBean redisData : redisBeanList) {
4.         redis.lpush(redisData.getKey(), redisData.getValue());
5.     }
6.     redis.close();
7.  }

(6)Redis业务验证:可以登录到Redis系统中,查询数据已持久化至Redis。

1.   redis-cli –h 100.*.*.100  -p 6379 -a 'password'
2.  select 0  ---切换到0库
3.  keys *    ---列出所有的key
4.  lrange CDNNODEQUALINFO 0 -1 查看所有的记录