Posted on: March 10, 2021 10:13 PM
Posted by: Renato
Views: 2800
Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]
Como buscar dados do AWS Athena usando PHP [Laravel(PHP)]
`How to fetch data from AWS Athena using PHP`
# O que é AWS Athena? [What is AWS Athena?]
R: O Amazon Athena é um serviço de consultas interativas que facilita a análise de dados no Amazon S3 usando SQL padrão. O Athena não precisa de servidor. Portanto, não há infraestrutura para gerenciar e você paga apenas pelas consultas executadas. O Athena é fácil de usar.
Video: https://www.youtube.com/watch?v=X9-ahfoLEaM
COMO FUNCIONA E O QUE É O AMAZON ATHENA
O Amazon Athena é um serviço que permite que um analista de dados realize consultas interativas na nuvem pública da AWS em dados armazenados no S3. Como o Athena é um serviço de consulta sem servidor, um analista não precisa gerenciar nenhuma infraestrutura de computação para usá-lo.
Também não há necessidade de carregar dados S3 no Amazon Athena ou transformá-los para análise, o que torna mais fácil e rápido para um analista obter insight. Um analista de dados acessa a Athena por meio do Console de gerenciamento da AWS, de uma interface de programação de aplicativos (API) ou de um driver de conexão JAVA. Apenas defina o esquema e comece a executar consultas SQL nos dados do S3.
Um administrador pode gerenciar o acesso à Athena por meio das políticas de gerenciamento de identidades e acesso da AWS, listas de controle de acesso e políticas de bucket do Amazon S3. Um usuário do Athena pode consultar dados criptografados com chaves gerenciadas pelo AWS Key Management Service e também pode criptografar os resultados da consulta. O Athena também permite o acesso entre contas aos buckets do S3 de outro usuário.
Além disso, a Athena usa catálogos de dados gerenciados para armazenar informações e esquemas relacionados às suas pesquisas nos dados do Amazon S3.
Tipos de dados suportados e integração
A Amazon Athena conta com o mecanismo de consulta SQL de código aberto distribuído pela Presto para permitir análises rápidas ad-hoc e solicitações mais complexas, incluindo funções de janela, grandes junções e agregações . A Athena pode processar tipos de dados estruturados e não estruturados, incluindo formatos como CSV, JSON, ORC, Parquet e Avro. O Athena também suporta dados compactados nos formatos Snappy, Zlib, LZO e GZIP.
O Athena integra-se a outros serviços no portfólio da AWS. Por exemplo, você pode usá-lo com o Amazon QuickSight para visualizar dados ou com o AWS Glue para ativar recursos de catálogo de dados mais sofisticados, como repositório de metadados, esquema automatizado e reconhecimento de partições e pipelines de dados baseados em Python . A própria Athena usa o Amazon S3 como um armazenamento de dados subjacente, que fornece redundância de dados.
Aqui abaixo vou apresentar como fiz para poder usar com PHP o Athenas da AWS
```sqlSELECT * FROM "sampledb"."elb_logs" limit 10;```
Rotas:
- http://localhost:8000/api/athena/get
```phpRoute::prefix('athena')->group(function () { Route::post('get', [AthenaController::class, 'getData']);});``` ```
Body:```json{ "machine": "Case_Erector_Machine"} ```
Aqui um modelo de exemplo que funcionou perfeito. Usar com Laravel(PHP) o controller
```php use Illuminate\Http\Request;use AWS;
class AthenaController extends Controller{ public function getData(Request $request) { $params = $request->all(); try { $athena = AWS::createClient('athena'); $result1 = $athena->StartQueryExecution( array( "QueryExecutionContext" => array("Database" => 'demo_db'), //"QueryString" => 'SELECT * FROM flow_pkg_machine ORDER BY timestamp DESC limit 50', "QueryString" => "SELECT * FROM flow_pkg_machine WHERE machine = '" . $params['machine'] . "' ORDER BY timestamp DESC LIMIT 50", "ResultConfiguration" => array( "EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"), "OutputLocation" => 's3://..........', ), ) );
$QueryExecutionId = $result1->get('QueryExecutionId'); $this->waitForQueryToComplete($QueryExecutionId);
$result1 = $athena->GetQueryResults(array( 'QueryExecutionId' => $QueryExecutionId, 'MaxResults' => 500, ));
$data = $result1->get('ResultSet'); $res = $data['Rows']; $records = [];
for ($i = 0; $i < count($res); $i++) { $record = [ "timestamp" => intval($res[$i]['Data'][0]['VarCharValue']), "machine" => $res[$i]['Data'][1]['VarCharValue'], "variable" => $res[$i]['Data'][2]['VarCharValue'], "value" => (double)$res[$i]['Data'][3]['VarCharValue'], ]; array_push($records, $record); } array_shift($records);
return response()->json(['data' => $records]);
} catch (AwsException $e) { error_log($e->getMessage()); }
} public function waitForQueryToComplete($QueryExecutionId) { $athena = AWS::createClient('athena'); while (1) { $result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId)); $res = $result->toArray();
//echo $res['QueryExecution']['Status']['State'].''; if ($res['QueryExecution']['Status']['State'] == 'FAILED') { echo 'Query Failed'; die; } else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') { echo 'Query was cancelled'; die; } else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') { break; // break while loop }
} }
}
```
Metodo que quais funcionou aqui abaixo:
```phppublic function index() { $id = 0;
$athena = AWS::createClient('athena'); //\Log::info(json_encode($athena));
$query = 'SELECT * FROM "flow_pkg" limit 10'; //\Log::info(json_encode($query));
$result = $athena->startQueryExecution([ 'QueryExecutionContext' => [ 'Database' => 'demo_db', ], 'QueryString' => 'SELECT * FROM flow_pkg_machine LIMIT 10', // REQUIRED 'ResultConfiguration' => [ // REQUIRED 'EncryptionConfiguration' => [ 'EncryptionOption' => 'SSE_S3', // REQUIRED ], 'OutputLocation' => 's3://teste-demo/demo/tables/', // REQUIRED ], ]);
// check completion : getQueryExecution() $exId = $result['QueryExecutionId']; //$id = $result['QueryExecutionId'];
$checkExecution = $athena->getQueryExecution([ 'QueryExecutionId' => $exId, // REQUIRED ]); //dd($checkExecution["QueryExecution"]["ResultConfiguration"]["OutputLocation"]); //sleep(10);
/*switch ($checkExecution["QueryExecution"]["Status"]["State"]) { case 'SUCCEEDED': dump('ok!'); break; case 'RUNNING': dump('sleep(30)'); sleep(30); if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') { sleep(20); } elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') { sleep(30); } break; case 'QUEUED': sleep(20); break; case 'FAILED': dump('FAILED'); break; default: # code... sleep(20); break; }*/
do { if ($checkExecution["QueryExecution"]["Status"]["State"] == 'RUNNING') { dump('sleep(30)'); sleep(30); break; } elseif ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') { sleep(30); if ($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') { sleep(20); }
}
} while (0);
if ($checkExecution["QueryExecution"]["Status"]["State"] == 'SUCCEEDED') // 'QUEUED|RUNNING|SUCCEEDED|FAILED|CANCELLED', { //dd($result['QueryExecutionId']); $dataOutput = $athena->getQueryResults([ 'QueryExecutionId' => $result['QueryExecutionId'], // REQUIRED ]); dd($dataOutput); while (($data = fgetcsv($dataOutput, 1000, ",")) !== false) { $num = count($data); //dd($data); echo "
$num fields in line $row:
\n"; $row++; for ($c = 0; $c < $num; $c++) { echo $data[$c] . "\n"; } }
} // elseif($checkExecution["QueryExecution"]["Status"]["State"] == 'QUEUED') { // sleep(40); // } else { // sleep(10); // }
return response()->json(['status' => $checkExecution["QueryExecution"]["Status"]["State"]]);
}```
Outro metodo que tambem utilizei, mas não retornou valores que precisava.```php use AWS;
class athenaController extends Controller{ public function getData() { try { $athena = AWS::createClient('athena'); $result1 = $athena->StartQueryExecution( array( "QueryExecutionContext" => array("Database" => 'demo_db'), "QueryString" => 'SELECT * FROM flow_pkg LIMIT 10', "ResultConfiguration" => array( "EncryptionConfiguration" => array("EncryptionOption" => "SSE_S3"), "OutputLocation" => 's3://........, // '. $result_logs, ), ) );
$QueryExecutionId = $result1->get('QueryExecutionId'); \Log::info($QueryExecutionId);
$this->waitForQueryToComplete($QueryExecutionId); //dd($QueryExecutionId);
$result1 = $athena->GetQueryResults(array( 'QueryExecutionId' => $QueryExecutionId, // REQUIRED 'MaxResults' => 500, )); \Log::info(json_encode($result1));
$data = $result1->get('ResultSet'); $res = $data['Rows']; json_encode($data['Rows']); //dd($res);
return response()->json($data['Rows']);
while (true) {
if ($result1->get('NextToken') == null) { break; }
$result1 = $athena->GetQueryResults(array( 'QueryExecutionId' => $QueryExecutionId, // REQUIRED 'NextToken' => $result1->get('NextToken'), 'MaxResults' => 500, ));
$data = $result1->get('ResultSet'); $res = array_merge($res, $data['Rows']); }
$resData = $this->processResultRows($res);
return $resData;
} catch (\Exception $e) { // Catch an S3 specific exception. echo $e->getMessage(); } catch (AwsException $e) { // This catches the more generic AwsException. You can grab information // from the exception using methods of the exception object.
echo $e->getAwsRequestId() . "\n"; echo $e->getAwsErrorType() . "\n"; echo $e->getAwsErrorCode() . "\n"; error_log($e->getMessage()); }
} public function waitForQueryToComplete($QueryExecutionId) { $athena = AWS::createClient('athena'); while (1) { $result = $athena->getQueryExecution(array('QueryExecutionId' => $QueryExecutionId)); $res = $result->toArray();
//echo $res['QueryExecution']['Status']['State'].''; if ($res['QueryExecution']['Status']['State'] == 'FAILED') { echo 'Query Failed'; die; } else if ($res['QueryExecution']['Status']['State'] == 'CANCELED') { echo 'Query was cancelled'; die; } else if ($res['QueryExecution']['Status']['State'] == 'SUCCEEDED') { break; // break while loop }
} }
/* * function to process data */ public function processResultRows($res) { $result = array(); $resul_array = array();
for ($i = 0; $i <= count($res); $i++) { for ($n = 0; $n < count($res[$i]['Data']); $n++) { if ($i == 0) { $result[] = $res[$i]['Data'][$n]['VarCharValue']; } else { $resul_array[$i][$result[$n]] = $res[$i]['Data'][$n]['VarCharValue']; } } }
echo 'resul_array_cnt: '.count($resul_array).''; return $resul_array; }
}
```
Fontes onde pesquisei:
- https://amolmatkars.wordpress.com/2017/07/20/how-to-fetch-data-from-aws-athena-using-php/comment-page-1/?unapproved=169&moderation-hash=486731d61ff8ca48b1414df250865850#comment-169
- https://aws.amazon.com/documentation/sdk-for-php/
- https://docs.aws.amazon.com/athena/latest/ug/select.html
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#getqueryresults
- https://www.php.net/manual/pt_BR/function.array-shift.php
- https://titanwolf.org/Network/Articles/Article?AID=d9e0d23a-28a8-4390-8a33-7513fcc45f64#gsc.tab=0
- https://docs.aws.amazon.com/aws-sdk-php/v3/api/api-athena-2017-05-18.html#startqueryexecution
- https://stackoverflow.com/questions/47568281/return-json-from-athena-query-via-api
- https://stackoverflow.com/questions/55345016/how-to-use-athena-with-laravel/66548012#66548012
- https://packagist.org/packages/alipeng/laravel-athena
- https://github.com/alipeng/laravel-athena
- https://github.com/flemzord/laravel-athena
- https://github.com/Influo/laravel-athena
- https://mugnos-it.com/
# Exemplo que não deu certo
```phpOur database config:
'test_athena' => [ 'driver' => 'odbc', 'dsn' => 'odbc:Driver=/opt/simba/athenaodbc/lib/64/libathenaodbc_sb64.so;' .'AwsRegion=us-east-1;' .'AuthenticationType=IAM Credentials;' .'UID=;' .'PWD=;' .'S3OutputLocation=s3:///;', 'host' => env('ATHENA_HOST', 'localhost'), 'port' => env('ATHENA_PORT', '5432'), 'database' => env('ATHENA_DATABASE', 'forge'), 'username' => env('ATHENA_USERNAME', 'forge'), 'password' => env('ATHENA_PASSWORD', ''), 'charset' => 'utf8', 'prefix' => '', 'schema' => 'public', 'options' => [ \PDO::ATTR_EMULATE_PREPARES => true, ],```
```php $pdo = DB::connection('test_athena')->getPdo();
// this crashes$pdo->prepare('select * from "qa_lines_csv" where "id" > ? limit 1')->execute([100]);
// this does not crash$pdo->prepare('select * from "qa_lines_csv" where "id" > 100 limit 1')->execute([]);
// this crashesDB::connection('test_athena')->table('qa_lines_csv')->where('id', '>', 100)->first()```
Donate to Site
Renato
Developer