overridedefinvoke( value: (String, String), context: SinkFunction.Context ): Unit = { val (key, featureKVs) = value val featureHash = getHash(featureKVs)
overridedefinvoke( value: (String, Timestamp, String), context: SinkFunction.Context ): Unit = { val (key, featureKVs) = value val featureHash = getHash(featureKVs)
var jedis: Jedis = null try { jedis = jedisPool.getResource jedis.hmset(key, featureHash) } catch { case e: Throwable => LOG.error( "Cannot HMSET key={} hash={} error message {}", key, featureHash, e.getMessage ) throw e } finally { try { jedis.close() } catch { case e: Throwable => LOG.error("Failed to close jedis instance", e) } } }
overridedefopen(parameters: Configuration): Unit = { try { jedisPool = newJedisPool() } catch { case e: Throwable => LOG.error("Redis has not been properly initialized: ", e) throw e } }
overridedefclose(): Unit = { if (jedisPool != null) { jedisPool.close() } } }
overridedefinvoke(value: (String, Int), context: Context): Unit = { bufferedElements += value if (bufferedElements.size == threshold) { for (element <- bufferedElements) { // send it to the sink } bufferedElements.clear() } }
overridedefinvoke(value: (String, Int), context: Context): Unit = { bufferedElements += value if (bufferedElements.size == threshold) { for (element <- bufferedElements) { // send it to the sink } bufferedElements.clear() } }
overridedefsnapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (element <- bufferedElements) { checkpointedState.add(element) } }
overridedefinitializeState(context: FunctionInitializationContext): Unit = { val descriptor = newListStateDescriptor[(String, Int)]( "buffered-elements", TypeInformation.of(newTypeHint[(String, Int)]() {}) )
overridedefinvoke( value: (String, String), context: SinkFunction.Context ): Unit = { val (key, featureKVs) = value val featureHash = getHash(featureKVs)
bufferedElements += (key -> featureHash)
if (bufferedElements.size == threshold) { var jedis: Jedis = null try { jedis = jedisPool.getResource val pipeline = jedis.pipelined() for ((key, hash) <- bufferedElements) { pipeline.hmset(key, hash) } pipeline.sync() } catch { case e: Throwable => LOG.error( "Pipelining failed with error message {}", e.getMessage ) throw e } finally { try { jedis.close() } catch { case e: Throwable => LOG.error("Failed to close jedis instance", e) } } bufferedElements.clear() } }
overridedefsnapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (element <- bufferedElements) { checkpointedState.add(element) } }
overridedefinitializeState(context: FunctionInitializationContext): Unit = { val descriptor = newListStateDescriptor[(String, java.util.Map[String, String])]( "buffered-elements", TypeInformation.of( newTypeHint[(String, java.util.Map[String, String])]() {} ) )
if (context.isRestored) { for (element <- checkpointedState.get().asScala) { bufferedElements += element } } }
overridedefopen(parameters: Configuration): Unit = { try { jedisPool = newJedisPool() } catch { case e: Throwable => LOG.error("Redis has not been properly initialized: ", e) throw e } }
overridedefclose(): Unit = { if (jedisPool != null) { jedisPool.close() } } }